Skip to content

Commit

Permalink
[7.x] [Task Manager] Batches the update operations in Task Manager (#…
Browse files Browse the repository at this point in the history
…71470) (#72724)

* [Task Manager] Batches the update operations in Task Manager  (#71470)

This PR attempts to batch update tasks in Task Manager in order to avoid overloading the Elasticsearch queue.
This is the 1st PR addressing #65551

Under the hood we now use a Reactive buffer accumulates all calls to the `update` api in the TaskStore and flushes after 50ms or when as many operations as there are workers have been buffered (whichever comes first).

* removed next tick scheduling as we dont use it and its test was too flaky
  • Loading branch information
gmmorris authored Jul 22, 2020
1 parent 6c7b914 commit e389968
Show file tree
Hide file tree
Showing 10 changed files with 636 additions and 7 deletions.
82 changes: 82 additions & 0 deletions x-pack/plugins/task_manager/server/buffered_task_store.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import uuid from 'uuid';
import { taskStoreMock } from './task_store.mock';
import { BufferedTaskStore } from './buffered_task_store';
import { asErr, asOk } from './lib/result_type';
import { TaskStatus } from './task';

describe('Buffered Task Store', () => {
test('proxies the TaskStore for `maxAttempts` and `remove`', async () => {
const taskStore = taskStoreMock.create({ maxAttempts: 10 });
taskStore.bulkUpdate.mockResolvedValue([]);
const bufferedStore = new BufferedTaskStore(taskStore, {});

expect(bufferedStore.maxAttempts).toEqual(10);

bufferedStore.remove('1');
expect(taskStore.remove).toHaveBeenCalledWith('1');
});

describe('update', () => {
test("proxies the TaskStore's `bulkUpdate`", async () => {
const taskStore = taskStoreMock.create({ maxAttempts: 10 });
const bufferedStore = new BufferedTaskStore(taskStore, {});

const task = mockTask();

taskStore.bulkUpdate.mockResolvedValue([asOk(task)]);

expect(await bufferedStore.update(task)).toMatchObject(task);
expect(taskStore.bulkUpdate).toHaveBeenCalledWith([task]);
});

test('handles partially successfull bulkUpdates resolving each call appropriately', async () => {
const taskStore = taskStoreMock.create({ maxAttempts: 10 });
const bufferedStore = new BufferedTaskStore(taskStore, {});

const tasks = [mockTask(), mockTask(), mockTask()];

taskStore.bulkUpdate.mockResolvedValueOnce([
asOk(tasks[0]),
asErr({ entity: tasks[1], error: new Error('Oh no, something went terribly wrong') }),
asOk(tasks[2]),
]);

const results = [
bufferedStore.update(tasks[0]),
bufferedStore.update(tasks[1]),
bufferedStore.update(tasks[2]),
];
expect(await results[0]).toMatchObject(tasks[0]);
expect(results[1]).rejects.toMatchInlineSnapshot(
`[Error: Oh no, something went terribly wrong]`
);
expect(await results[2]).toMatchObject(tasks[2]);
});
});
});

function mockTask() {
return {
id: `task_${uuid.v4()}`,
attempts: 0,
schedule: undefined,
params: { hello: 'world' },
retryAt: null,
runAt: new Date(),
scheduledAt: new Date(),
scope: undefined,
startedAt: null,
state: { foo: 'bar' },
status: TaskStatus.Idle,
taskType: 'report',
user: undefined,
version: '123',
ownerId: '123',
};
}
39 changes: 39 additions & 0 deletions x-pack/plugins/task_manager/server/buffered_task_store.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { TaskStore } from './task_store';
import { ConcreteTaskInstance } from './task';
import { Updatable } from './task_runner';
import { createBuffer, Operation, BufferOptions } from './lib/bulk_operation_buffer';
import { unwrapPromise } from './lib/result_type';

// by default allow updates to be buffered for up to 50ms
const DEFAULT_BUFFER_MAX_DURATION = 50;

export class BufferedTaskStore implements Updatable {
private bufferedUpdate: Operation<ConcreteTaskInstance, Error>;
constructor(private readonly taskStore: TaskStore, options: BufferOptions) {
this.bufferedUpdate = createBuffer<ConcreteTaskInstance, Error>(
(docs) => taskStore.bulkUpdate(docs),
{
bufferMaxDuration: DEFAULT_BUFFER_MAX_DURATION,
...options,
}
);
}

public get maxAttempts(): number {
return this.taskStore.maxAttempts;
}

public async update(doc: ConcreteTaskInstance): Promise<ConcreteTaskInstance> {
return unwrapPromise(this.bufferedUpdate(doc));
}

public async remove(id: string): Promise<void> {
return this.taskStore.remove(id);
}
}
246 changes: 246 additions & 0 deletions x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { createBuffer, Entity, OperationError, BulkOperation } from './bulk_operation_buffer';
import { mapErr, asOk, asErr, Ok, Err } from './result_type';

interface TaskInstance extends Entity {
attempts: number;
}

const createTask = (function (): () => TaskInstance {
let counter = 0;
return () => ({
id: `task ${++counter}`,
attempts: 1,
});
})();

function incrementAttempts(task: TaskInstance): Ok<TaskInstance> {
return asOk({
...task,
attempts: task.attempts + 1,
});
}

function errorAttempts(task: TaskInstance): Err<OperationError<TaskInstance, Error>> {
return asErr({
entity: incrementAttempts(task).value,
error: { name: '', message: 'Oh no, something went terribly wrong', statusCode: 500 },
});
}

describe('Bulk Operation Buffer', () => {
describe('createBuffer()', () => {
test('batches up multiple Operation calls', async () => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn(
([task1, task2]) => {
return Promise.resolve([incrementAttempts(task1), incrementAttempts(task2)]);
}
);

const bufferedUpdate = createBuffer(bulkUpdate);

const task1 = createTask();
const task2 = createTask();

expect(await Promise.all([bufferedUpdate(task1), bufferedUpdate(task2)])).toMatchObject([
incrementAttempts(task1),
incrementAttempts(task2),
]);
expect(bulkUpdate).toHaveBeenCalledWith([task1, task2]);
});

test('batch updates can be customised to execute after a certain period', async () => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn((tasks) => {
return Promise.resolve(tasks.map(incrementAttempts));
});

const bufferMaxDuration = 50;
const bufferedUpdate = createBuffer(bulkUpdate, { bufferMaxDuration });

const task1 = createTask();
const task2 = createTask();
const task3 = createTask();
const task4 = createTask();
const task5 = createTask();
const task6 = createTask();

return new Promise((resolve) => {
Promise.all([bufferedUpdate(task1), bufferedUpdate(task2)]).then((_) => {
expect(bulkUpdate).toHaveBeenCalledTimes(1);
expect(bulkUpdate).toHaveBeenCalledWith([task1, task2]);
expect(bulkUpdate).not.toHaveBeenCalledWith([task3, task4]);
});

setTimeout(() => {
// on next tick
setTimeout(() => {
// on next tick
expect(bulkUpdate).toHaveBeenCalledTimes(2);
Promise.all([bufferedUpdate(task5), bufferedUpdate(task6)]).then((_) => {
expect(bulkUpdate).toHaveBeenCalledTimes(3);
expect(bulkUpdate).toHaveBeenCalledWith([task5, task6]);
resolve();
});
}, bufferMaxDuration + 1);

expect(bulkUpdate).toHaveBeenCalledTimes(1);
Promise.all([bufferedUpdate(task3), bufferedUpdate(task4)]).then((_) => {
expect(bulkUpdate).toHaveBeenCalledTimes(2);
expect(bulkUpdate).toHaveBeenCalledWith([task3, task4]);
});
}, bufferMaxDuration + 1);
});
});

test('batch updates are executed once queue hits a certain bound', async () => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn((tasks) => {
return Promise.resolve(tasks.map(incrementAttempts));
});

const bufferedUpdate = createBuffer(bulkUpdate, {
bufferMaxDuration: 100,
bufferMaxOperations: 2,
});

const task1 = createTask();
const task2 = createTask();
const task3 = createTask();
const task4 = createTask();
const task5 = createTask();

return new Promise((resolve) => {
bufferedUpdate(task1);
bufferedUpdate(task2);
bufferedUpdate(task3);
bufferedUpdate(task4);

setTimeout(() => {
expect(bulkUpdate).toHaveBeenCalledTimes(2);
expect(bulkUpdate).toHaveBeenCalledWith([task1, task2]);
expect(bulkUpdate).toHaveBeenCalledWith([task3, task4]);

setTimeout(() => {
expect(bulkUpdate).toHaveBeenCalledTimes(2);
bufferedUpdate(task5).then((_) => {
expect(bulkUpdate).toHaveBeenCalledTimes(3);
expect(bulkUpdate).toHaveBeenCalledWith([task5]);
resolve();
});
}, 50);
}, 50);
});
});

test('queue upper bound is reset after each flush', async () => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn((tasks) => {
return Promise.resolve(tasks.map(incrementAttempts));
});

const bufferMaxDuration = 100;
const bufferedUpdate = createBuffer(bulkUpdate, {
bufferMaxDuration,
bufferMaxOperations: 3,
});

const task1 = createTask();
const task2 = createTask();
const task3 = createTask();
const task4 = createTask();

return new Promise((resolve) => {
bufferedUpdate(task1);
bufferedUpdate(task2);

setTimeout(() => {
expect(bulkUpdate).toHaveBeenCalledTimes(1);
expect(bulkUpdate).toHaveBeenCalledWith([task1, task2]);

bufferedUpdate(task3);
bufferedUpdate(task4);

setTimeout(() => {
expect(bulkUpdate).toHaveBeenCalledTimes(1);

setTimeout(() => {
expect(bulkUpdate).toHaveBeenCalledTimes(2);
expect(bulkUpdate).toHaveBeenCalledWith([task3, task4]);
resolve();
}, bufferMaxDuration / 2);
}, bufferMaxDuration / 2);
}, bufferMaxDuration + 1);
});
});
test('handles both resolutions and rejections at individual task level', async (done) => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn(
([task1, task2, task3]) => {
return Promise.resolve([
incrementAttempts(task1),
errorAttempts(task2),
incrementAttempts(task3),
]);
}
);

const bufferedUpdate = createBuffer(bulkUpdate);

const task1 = createTask();
const task2 = createTask();
const task3 = createTask();

return Promise.all([
expect(bufferedUpdate(task1)).resolves.toMatchObject(incrementAttempts(task1)),
expect(bufferedUpdate(task2)).rejects.toMatchObject(
mapErr(
(err: OperationError<TaskInstance, Error>) => asErr(err.error),
errorAttempts(task2)
)
),
expect(bufferedUpdate(task3)).resolves.toMatchObject(incrementAttempts(task3)),
]).then(() => {
expect(bulkUpdate).toHaveBeenCalledTimes(1);
done();
});
});

test('handles bulkUpdate failure', async (done) => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn(() => {
return Promise.reject(new Error('bulkUpdate is an illusion'));
});

const bufferedUpdate = createBuffer(bulkUpdate);

const task1 = createTask();
const task2 = createTask();
const task3 = createTask();

return Promise.all([
expect(bufferedUpdate(task1)).rejects.toMatchInlineSnapshot(`
Object {
"error": [Error: bulkUpdate is an illusion],
"tag": "err",
}
`),
expect(bufferedUpdate(task2)).rejects.toMatchInlineSnapshot(`
Object {
"error": [Error: bulkUpdate is an illusion],
"tag": "err",
}
`),
expect(bufferedUpdate(task3)).rejects.toMatchInlineSnapshot(`
Object {
"error": [Error: bulkUpdate is an illusion],
"tag": "err",
}
`),
]).then(() => {
expect(bulkUpdate).toHaveBeenCalledTimes(1);
done();
});
});
});
});
Loading

0 comments on commit e389968

Please sign in to comment.