Skip to content

Commit

Permalink
make updates buffered in task manager
Browse files Browse the repository at this point in the history
  • Loading branch information
gmmorris committed Jul 13, 2020
1 parent 2a43b48 commit 19246f5
Show file tree
Hide file tree
Showing 7 changed files with 344 additions and 3 deletions.
40 changes: 40 additions & 0 deletions x-pack/plugins/task_manager/server/buffered_task_store.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { TaskStore } from './task_store';
import { ConcreteTaskInstance } from './task';
import { Updatable } from './task_runner';
import { createBuffer, Operation } from './lib/bulk_operation_buffer';
import { unwrapPromise, mapErr } from './lib/result_type';

export class BufferedTaskStore implements Updatable {
private bufferedUpdate: Operation<ConcreteTaskInstance, Error>;
constructor(private readonly taskStore: TaskStore) {
this.bufferedUpdate = createBuffer<ConcreteTaskInstance, Error>(async (docs) => {
return (await taskStore.bulkUpdate(docs)).map((entityOrError, index) =>
mapErr(
(error: Error) => ({
entity: docs[index],
error,
}),
entityOrError
)
);
});
}

public get maxAttempts(): number {
return this.taskStore.maxAttempts;
}

public async update(doc: ConcreteTaskInstance): Promise<ConcreteTaskInstance> {
return unwrapPromise(this.bufferedUpdate(doc));
}

public async remove(id: string): Promise<void> {
return this.taskStore.remove(id);
}
}
167 changes: 167 additions & 0 deletions x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { createBuffer, Entity, OperationError, BulkOperation } from './bulk_operation_buffer';
import { mapErr, asOk, asErr, Ok, Err } from './result_type';

interface TaskInstance extends Entity {
attempts: number;
}

const createTask = (function (): () => TaskInstance {
let counter = 0;
return () => ({
id: `task ${++counter}`,
attempts: 1,
});
})();

function incrementAttempts(task: TaskInstance): Ok<TaskInstance> {
return asOk({
...task,
attempts: task.attempts + 1,
});
}

function errorAttempts(task: TaskInstance): Err<OperationError<TaskInstance, Error>> {
return asErr({
entity: incrementAttempts(task).value,
error: { name: '', message: 'Oh no, something went terribly wrong', statusCode: 500 },
});
}

describe('Task Store Buffer', () => {
describe('createBuffer()', () => {
test('batches up multiple Operation calls', async () => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn(
([task1, task2]) => {
return Promise.resolve([incrementAttempts(task1), incrementAttempts(task2)]);
}
);

const bufferedUpdate = createBuffer(bulkUpdate);

const task1 = createTask();
const task2 = createTask();

expect(await Promise.all([bufferedUpdate(task1), bufferedUpdate(task2)])).toMatchObject([
incrementAttempts(task1),
incrementAttempts(task2),
]);
expect(bulkUpdate).toHaveBeenCalledWith([task1, task2]);
});

test('batch updates are executed at most by the next Event Loop tick', async () => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn((tasks) => {
return Promise.resolve(tasks.map(incrementAttempts));
});

const bufferedUpdate = createBuffer(bulkUpdate);

const task1 = createTask();
const task2 = createTask();
const task3 = createTask();
const task4 = createTask();
const task5 = createTask();
const task6 = createTask();

return new Promise((resolve) => {
Promise.all([bufferedUpdate(task1), bufferedUpdate(task2)]).then((_) => {
expect(bulkUpdate).toHaveBeenCalledTimes(1);
expect(bulkUpdate).toHaveBeenCalledWith([task1, task2]);
expect(bulkUpdate).not.toHaveBeenCalledWith([task3, task4]);
});

setTimeout(() => {
// on next tick
setTimeout(() => {
// on next tick
expect(bulkUpdate).toHaveBeenCalledTimes(2);
Promise.all([bufferedUpdate(task5), bufferedUpdate(task6)]).then((_) => {
expect(bulkUpdate).toHaveBeenCalledTimes(3);
expect(bulkUpdate).toHaveBeenCalledWith([task5, task6]);
resolve();
});
}, 0);

expect(bulkUpdate).toHaveBeenCalledTimes(1);
Promise.all([bufferedUpdate(task3), bufferedUpdate(task4)]).then((_) => {
expect(bulkUpdate).toHaveBeenCalledTimes(2);
expect(bulkUpdate).toHaveBeenCalledWith([task3, task4]);
});
}, 0);
});
});

test('handles both resolutions and rejections at individual task level', async (done) => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn(
([task1, task2, task3]) => {
return Promise.resolve([
incrementAttempts(task1),
errorAttempts(task2),
incrementAttempts(task3),
]);
}
);

const bufferedUpdate = createBuffer(bulkUpdate);

const task1 = createTask();
const task2 = createTask();
const task3 = createTask();

return Promise.all([
expect(bufferedUpdate(task1)).resolves.toMatchObject(incrementAttempts(task1)),
expect(bufferedUpdate(task2)).rejects.toMatchObject(
mapErr(
(err: OperationError<TaskInstance, Error>) => asErr(err.error),
errorAttempts(task2)
)
),
expect(bufferedUpdate(task3)).resolves.toMatchObject(incrementAttempts(task3)),
]).then(() => {
expect(bulkUpdate).toHaveBeenCalledTimes(1);
done();
});
});

test('handles bulkUpdate failure', async (done) => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn(() => {
return Promise.reject(new Error('bulkUpdate is an illusion'));
});

const bufferedUpdate = createBuffer(bulkUpdate);

const task1 = createTask();
const task2 = createTask();
const task3 = createTask();

return Promise.all([
expect(bufferedUpdate(task1)).rejects.toMatchInlineSnapshot(`
Object {
"error": [Error: bulkUpdate is an illusion],
"tag": "err",
}
`),
expect(bufferedUpdate(task2)).rejects.toMatchInlineSnapshot(`
Object {
"error": [Error: bulkUpdate is an illusion],
"tag": "err",
}
`),
expect(bufferedUpdate(task3)).rejects.toMatchInlineSnapshot(`
Object {
"error": [Error: bulkUpdate is an illusion],
"tag": "err",
}
`),
]).then(() => {
expect(bulkUpdate).toHaveBeenCalledTimes(1);
done();
});
});
});
});
69 changes: 69 additions & 0 deletions x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { keyBy, map } from 'lodash';
import { Subject } from 'rxjs';
import { bufferWhen, filter } from 'rxjs/operators';
import { either, Result, asOk, asErr, Ok, Err } from './result_type';

export interface Entity {
id: string;
}

export interface OperationError<H, E> {
entity: H;
error: E;
}

export type OperationResult<H, E> = Result<H, OperationError<H, E>>;

export type Operation<H, E> = (entity: H) => Promise<Result<H, E>>;
export type BulkOperation<H, E> = (entities: H[]) => Promise<Array<OperationResult<H, E>>>;

export function createBuffer<H extends Entity, E>(
bulkOperation: BulkOperation<H, E>
): Operation<H, E> {
const flushBuffer = new Subject<void>();
const storeUpdateBuffer = new Subject<{
entity: H;
onSuccess: (entity: Ok<H>) => void;
onFailure: (error: Err<E>) => void;
}>();

storeUpdateBuffer
.pipe(
bufferWhen(() => flushBuffer),
filter((tasks) => tasks.length > 0)
)
.subscribe((entities) => {
const entityById = keyBy(entities, ({ entity: { id } }) => id);
bulkOperation(map(entities, 'entity'))
.then((results) => {
results.forEach((result) =>
either(
result,
(entity) => {
entityById[entity.id].onSuccess(asOk(entity));
},
({ entity, error }: OperationError<H, E>) => {
entityById[entity.id].onFailure(asErr(error));
}
)
);
})
.catch((ex) => {
entities.forEach(({ onFailure }) => onFailure(asErr(ex)));
});
});

return async function (entity: H) {
return new Promise((resolve, reject) => {
// ensure we flush by the end of the "current" event loop tick
setImmediate(() => flushBuffer.next());
storeUpdateBuffer.next({ entity, onSuccess: resolve, onFailure: reject });
});
};
}
8 changes: 8 additions & 0 deletions x-pack/plugins/task_manager/server/lib/result_type.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ export async function promiseResult<T, E>(future: Promise<T>): Promise<Result<T,
}
}

export async function unwrapPromise<T, E>(future: Promise<Result<T, E>>): Promise<T> {
return map(
await future,
(value: T) => Promise.resolve(value),
(err: E) => Promise.reject(err)
);
}

export function unwrap<T, E>(result: Result<T, E>): T | E {
return isOk(result) ? result.value : result.error;
}
Expand Down
8 changes: 7 additions & 1 deletion x-pack/plugins/task_manager/server/task_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import {
} from './task_store';
import { identifyEsError } from './lib/identify_es_error';
import { ensureDeprecatedFieldsAreCorrected } from './lib/correct_deprecated_fields';
import { BufferedTaskStore } from './buffered_task_store';

const VERSION_CONFLICT_STATUS = 409;

Expand Down Expand Up @@ -90,7 +91,10 @@ export type TaskLifecycleEvent = TaskMarkRunning | TaskRun | TaskClaim | TaskRun
*/
export class TaskManager {
private definitions: TaskDictionary<TaskDefinition> = {};

private store: TaskStore;
private bufferedStore: BufferedTaskStore;

private logger: Logger;
private pool: TaskPool;
// all task related events (task claimed, task marked as running, etc.) are emitted through events$
Expand Down Expand Up @@ -139,6 +143,8 @@ export class TaskManager {
// pipe store events into the TaskManager's event stream
this.store.events.subscribe((event) => this.events$.next(event));

this.bufferedStore = new BufferedTaskStore(this.store);

this.pool = new TaskPool({
logger: this.logger,
maxWorkers: opts.config.max_workers,
Expand All @@ -165,7 +171,7 @@ export class TaskManager {
return new TaskManagerRunner({
logger: this.logger,
instance,
store: this.store,
store: this.bufferedStore,
definitions: this.definitions,
beforeRun: this.middleware.beforeRun,
beforeMarkRunning: this.middleware.beforeMarkRunning,
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/task_manager/server/task_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export interface TaskRunner {
toString: () => string;
}

interface Updatable {
export interface Updatable {
readonly maxAttempts: number;
update(doc: ConcreteTaskInstance): Promise<ConcreteTaskInstance>;
remove(id: string): Promise<void>;
Expand Down
Loading

0 comments on commit 19246f5

Please sign in to comment.