Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Task manager] Adds ensureScheduled api to allow safer rescheduling of existing tasks #50232

Merged
merged 15 commits into from
Nov 14, 2019
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion x-pack/legacy/plugins/lens/server/usage/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ function scheduleTasks(server: Server) {
// function block.
(async () => {
try {
await taskManager.schedule({
await taskManager.ensureScheduling({
id: TASK_ID,
taskType: TELEMETRY_TASK_TYPE,
state: { byDate: {}, suggestionsByDate: {}, saved: {}, runs: 0 },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export function scheduleTask(server) {
// function block.
(async () => {
try {
await taskManager.schedule({
await taskManager.ensureScheduling({
id: TASK_ID,
taskType: TELEMETRY_TASK_TYPE,
state: { stats: {}, runs: 0 },
Expand Down
2 changes: 1 addition & 1 deletion x-pack/legacy/plugins/oss_telemetry/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export interface HapiServer {
};
task_manager: {
registerTaskDefinitions: (opts: any) => void;
schedule: (opts: any) => Promise<void>;
ensureScheduling: (opts: any) => Promise<void>;
fetch: (
opts: any
) => Promise<{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export function scheduleTasks(server: HapiServer) {
// function block.
(async () => {
try {
await taskManager.schedule({
await taskManager.ensureScheduling({
id: `${PLUGIN_ID}-${VIS_TELEMETRY_TASK}`,
taskType: VIS_TELEMETRY_TASK,
state: { stats: {}, runs: 0 },
Expand Down
2 changes: 1 addition & 1 deletion x-pack/legacy/plugins/oss_telemetry/test_utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export const getMockKbnServer = (
xpack_main: {},
task_manager: {
registerTaskDefinitions: (opts: any) => undefined,
schedule: (opts: any) => Promise.resolve(),
ensureScheduling: (opts: any) => Promise.resolve(),
fetch: mockTaskFetch,
},
},
Expand Down
11 changes: 11 additions & 0 deletions x-pack/legacy/plugins/task_manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ The data stored for a task instance looks something like this:

The task manager mixin exposes a taskManager object on the Kibana server which plugins can use to manage scheduled tasks. Each method takes an optional `scope` argument and ensures that only tasks with the specified scope(s) will be affected.

### schedule
Using `schedule` you can instruct TaskManger to schedule an instance of a TaskType at some point in the future.

```js
const taskManager = server.plugins.task_manager;
// Schedules a task. All properties are as documented in the previous
Expand Down Expand Up @@ -256,6 +259,14 @@ const results = await manager.find({ scope: 'my-fanci-app', searchAfter: ['ids']
}
```

### ensureScheduling
When using the `schedule` api to schedule a Task you can provide a hard coded `id` on the Task. This tells TaskManager to use this `id` to identify the Task Instance rather than generate an `id` on its own.
The danger is that in such a situation, a Task with that same `id` might already have been scheduled at some earlier point, and this would result in an error. In some cases, this is the expected behavior, but often you only care about ensuring the task has been _scheduled_ and don't need it to be scheduled a fresh.

To achieve this you should use the `ensureScheduling` api which has the exact same behavior as `schedule`, except it allows the scheduling of a Task with an `id` that's already in assigned to another Task and it will assume that the existing Task is the one you wished to `schedule`, treating this as a successful operation.

### more options

More custom access to the tasks can be done directly via Elasticsearch, though that won't be officially supported, as we can change the document structure at any time.

## Middleware
Expand Down
1 change: 1 addition & 0 deletions x-pack/legacy/plugins/task_manager/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ describe('Task Manager Plugin', () => {
expect(setupResult).toMatchInlineSnapshot(`
Object {
"addMiddleware": [Function],
"ensureScheduling": [Function],
"fetch": [Function],
"registerTaskDefinitions": [Function],
"remove": [Function],
Expand Down
2 changes: 2 additions & 0 deletions x-pack/legacy/plugins/task_manager/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export interface PluginSetupContract {
fetch: TaskManager['fetch'];
remove: TaskManager['remove'];
schedule: TaskManager['schedule'];
ensureScheduling: TaskManager['ensureScheduling'];
addMiddleware: TaskManager['addMiddleware'];
registerTaskDefinitions: TaskManager['registerTaskDefinitions'];
}
Expand Down Expand Up @@ -59,6 +60,7 @@ export class Plugin {
fetch: (...args) => taskManager.fetch(...args),
remove: (...args) => taskManager.remove(...args),
schedule: (...args) => taskManager.schedule(...args),
ensureScheduling: (...args) => taskManager.ensureScheduling(...args),
addMiddleware: (...args) => taskManager.addMiddleware(...args),
registerTaskDefinitions: (...args) => taskManager.registerTaskDefinitions(...args),
};
Expand Down
11 changes: 11 additions & 0 deletions x-pack/legacy/plugins/task_manager/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,17 @@ export interface TaskInstance {
ownerId?: string | null;
}

/**
* A task instance that has an id.
*/
export interface ExistingTaskInstance extends TaskInstance {
/**
* The id of the Elastic document that stores this instance's data. This can
* be passed by the caller when scheduling the task.
*/
id: string;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, TIL. You can subclass an interface, changing a super-interface's property from optional to not optional. Neat-O!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's changed now, as I'm using a new Utility type marking id as Required ;)
But the name is now TaskInstanceWithId.

}

/**
* A task instance that has an id and is ready for storage.
*/
Expand Down
1 change: 1 addition & 0 deletions x-pack/legacy/plugins/task_manager/task_manager.mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const createTaskManagerMock = () => {
const mocked: jest.Mocked<TaskManager> = {
registerTaskDefinitions: jest.fn(),
addMiddleware: jest.fn(),
ensureScheduling: jest.fn(),
schedule: jest.fn(),
fetch: jest.fn(),
remove: jest.fn(),
Expand Down
79 changes: 79 additions & 0 deletions x-pack/legacy/plugins/task_manager/task_manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,85 @@ describe('TaskManager', () => {
expect(savedObjectsClient.create).toHaveBeenCalled();
});

test('allows scheduling existing tasks that may have already been scheduled', async () => {
const client = new TaskManager(taskManagerOpts);
client.registerTaskDefinitions({
foo: {
type: 'foo',
title: 'Foo',
createTaskRunner: jest.fn(),
},
});
savedObjectsClient.create.mockRejectedValueOnce({
statusCode: 409,
});

client.start();

const result = await client.ensureScheduling({
id: 'my-foo-id',
taskType: 'foo',
params: {},
state: {},
});

expect(result.id).toEqual('my-foo-id');
});

test('doesnt ignore failure to scheduling existing tasks for reasons other than already being scheduled', async () => {
const client = new TaskManager(taskManagerOpts);
client.registerTaskDefinitions({
foo: {
type: 'foo',
title: 'Foo',
createTaskRunner: jest.fn(),
},
});
savedObjectsClient.create.mockRejectedValueOnce({
statusCode: 500,
});

client.start();

return expect(
client.ensureScheduling({
id: 'my-foo-id',
taskType: 'foo',
params: {},
state: {},
})
).rejects.toMatchObject({
statusCode: 500,
});
});

test('doesnt allow naively rescheduling existing tasks that have already been scheduled', async () => {
const client = new TaskManager(taskManagerOpts);
client.registerTaskDefinitions({
foo: {
type: 'foo',
title: 'Foo',
createTaskRunner: jest.fn(),
},
});
savedObjectsClient.create.mockRejectedValueOnce({
statusCode: 409,
});

client.start();

return expect(
client.schedule({
id: 'my-foo-id',
taskType: 'foo',
params: {},
state: {},
})
).rejects.toMatchObject({
statusCode: 409,
});
});

test('allows and queues removing tasks before starting', async () => {
const client = new TaskManager(taskManagerOpts);
savedObjectsClient.delete.mockResolvedValueOnce({});
Expand Down
23 changes: 23 additions & 0 deletions x-pack/legacy/plugins/task_manager/task_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
TaskDefinition,
TaskDictionary,
ConcreteTaskInstance,
ExistingTaskInstance,
RunContext,
TaskInstance,
} from './task';
Expand All @@ -29,6 +30,8 @@ import {
} from './task_store';
import { identifyEsError } from './lib/identify_es_error';

const VERSION_CONFLICT_STATUS = 409;

export interface TaskManagerOpts {
logger: Logger;
config: any;
Expand Down Expand Up @@ -219,6 +222,26 @@ export class TaskManager {
return result;
}

/**
* Schedules a task with an Id
*
* @param task - The task being scheduled.
* @returns {Promise<ConcreteTaskInstance>}
*/
public async ensureScheduling(
taskInstance: ExistingTaskInstance,
options?: any
): Promise<ExistingTaskInstance> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I guess sometimes this will return a ConcreteTaskInstance (if not already scheduled), but otherwise the taskInstance sent in. Seems like it would be "nice" if it did always return a ConcreteTaskInstance, but guessing you will have to do another read to get that, and hardly seems worth it, since the caller may not need it. Seems like this should be fine as a first attempt, maybe we'd need to re-look at a way to get the ConcreteTaskInstance back later, if we find we need it for some reason.

In any case, the JSDoc comment above this should be changed to say it's returning an ExistingTaskInstance instead of ConcreteTaskInstance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I did think about that, though, the type it returns will always be TaskInstanceWithId, never an explicit ConcreteTaskInstance, as TaskInstanceWithId is essentially a subtype of ConcreteTaskInstance, but you're right that otherwise we'd have to make an additional read and it didn't seem worth it.
Feels reasonable to leave it such due to the ensure nature, no?

I'll change the JSDoc, thanks for catching 👍

try {
return await this.schedule(taskInstance, options);
} catch (err) {
if (err.statusCode === VERSION_CONFLICT_STATUS) {
return taskInstance;
}
throw err;
}
}

/**
* Fetches a paginatable list of scheduled tasks.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,34 @@ export function initRoutes(server) {
config: {
validate: {
payload: Joi.object({
taskType: Joi.string().required(),
interval: Joi.string().optional(),
params: Joi.object().required(),
state: Joi.object().optional(),
id: Joi.string().optional(),
task: Joi.object({
taskType: Joi.string().required(),
interval: Joi.string().optional(),
params: Joi.object().required(),
state: Joi.object().optional(),
id: Joi.string().optional()
}),
ensureScheduling: Joi.boolean()
.default(false)
.optional(),
}),
},
},
async handler(request) {
try {
const task = await taskManager.schedule({
...request.payload,
const { ensureScheduling = false, task: taskFields } = request.payload;
const task = {
...taskFields,
scope: [scope],
}, { request });
return task;
};

const taskResult = await (
ensureScheduling
? taskManager.ensureScheduling(task, { request })
: taskManager.schedule(task, { request })
);

return taskResult;
} catch (err) {
return err;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,15 @@ export default function ({ getService }) {
function scheduleTask(task) {
return supertest.post('/api/sample_tasks')
.set('kbn-xsrf', 'xxx')
.send(task)
.send({ task })
.expect(200)
.then((response) => response.body);
}

function scheduleTaskIfNotExists(task) {
return supertest.post('/api/sample_tasks')
.set('kbn-xsrf', 'xxx')
.send({ task, ensureScheduling: true })
.expect(200)
.then((response) => response.body);
}
Expand Down Expand Up @@ -116,6 +124,24 @@ export default function ({ getService }) {
expect(result.id).to.be('test-task-for-sample-task-plugin-to-test-task-manager');
});

it('should allow a task with a given ID to be scheduled multiple times', async () => {
const result = await scheduleTaskIfNotExists({
id: 'test-task-to-reschedule-in-task-manager',
taskType: 'sampleTask',
params: { },
});

expect(result.id).to.be('test-task-to-reschedule-in-task-manager');

const rescheduleResult = await scheduleTaskIfNotExists({
id: 'test-task-to-reschedule-in-task-manager',
taskType: 'sampleTask',
params: { },
});

expect(rescheduleResult.id).to.be('test-task-to-reschedule-in-task-manager');
});

it('should reschedule if task errors', async () => {
const task = await scheduleTask({
taskType: 'sampleTask',
Expand Down