diff --git a/x-pack/plugins/task_manager/server/buffered_task_store.ts b/x-pack/plugins/task_manager/server/buffered_task_store.ts new file mode 100644 index 0000000000000..9836e8cda592b --- /dev/null +++ b/x-pack/plugins/task_manager/server/buffered_task_store.ts @@ -0,0 +1,40 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { TaskStore } from './task_store'; +import { ConcreteTaskInstance } from './task'; +import { Updatable } from './task_runner'; +import { createBuffer, Operation } from './lib/bulk_operation_buffer'; +import { unwrapPromise, mapErr } from './lib/result_type'; + +export class BufferedTaskStore implements Updatable { + private bufferedUpdate: Operation; + constructor(private readonly taskStore: TaskStore) { + this.bufferedUpdate = createBuffer(async (docs) => { + return (await taskStore.bulkUpdate(docs)).map((entityOrError, index) => + mapErr( + (error: Error) => ({ + entity: docs[index], + error, + }), + entityOrError + ) + ); + }); + } + + public get maxAttempts(): number { + return this.taskStore.maxAttempts; + } + + public async update(doc: ConcreteTaskInstance): Promise { + return unwrapPromise(this.bufferedUpdate(doc)); + } + + public async remove(id: string): Promise { + return this.taskStore.remove(id); + } +} diff --git a/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.test.ts b/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.test.ts new file mode 100644 index 0000000000000..87e383b311a86 --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.test.ts @@ -0,0 +1,167 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { createBuffer, Entity, OperationError, BulkOperation } from './bulk_operation_buffer'; +import { mapErr, asOk, asErr, Ok, Err } from './result_type'; + +interface TaskInstance extends Entity { + attempts: number; +} + +const createTask = (function (): () => TaskInstance { + let counter = 0; + return () => ({ + id: `task ${++counter}`, + attempts: 1, + }); +})(); + +function incrementAttempts(task: TaskInstance): Ok { + return asOk({ + ...task, + attempts: task.attempts + 1, + }); +} + +function errorAttempts(task: TaskInstance): Err> { + return asErr({ + entity: incrementAttempts(task).value, + error: { name: '', message: 'Oh no, something went terribly wrong', statusCode: 500 }, + }); +} + +describe('Task Store Buffer', () => { + describe('createBuffer()', () => { + test('batches up multiple Operation calls', async () => { + const bulkUpdate: jest.Mocked> = jest.fn( + ([task1, task2]) => { + return Promise.resolve([incrementAttempts(task1), incrementAttempts(task2)]); + } + ); + + const bufferedUpdate = createBuffer(bulkUpdate); + + const task1 = createTask(); + const task2 = createTask(); + + expect(await Promise.all([bufferedUpdate(task1), bufferedUpdate(task2)])).toMatchObject([ + incrementAttempts(task1), + incrementAttempts(task2), + ]); + expect(bulkUpdate).toHaveBeenCalledWith([task1, task2]); + }); + + test('batch updates are executed at most by the next Event Loop tick', async () => { + const bulkUpdate: jest.Mocked> = jest.fn((tasks) => { + return Promise.resolve(tasks.map(incrementAttempts)); + }); + + const bufferedUpdate = createBuffer(bulkUpdate); + + const task1 = createTask(); + const task2 = createTask(); + const task3 = createTask(); + const task4 = createTask(); + const task5 = createTask(); + const task6 = createTask(); + + return new Promise((resolve) => { + Promise.all([bufferedUpdate(task1), bufferedUpdate(task2)]).then((_) => { + expect(bulkUpdate).toHaveBeenCalledTimes(1); + expect(bulkUpdate).toHaveBeenCalledWith([task1, task2]); + expect(bulkUpdate).not.toHaveBeenCalledWith([task3, task4]); + }); + + setTimeout(() => { + // on next tick + setTimeout(() => { + // on next tick + expect(bulkUpdate).toHaveBeenCalledTimes(2); + Promise.all([bufferedUpdate(task5), bufferedUpdate(task6)]).then((_) => { + expect(bulkUpdate).toHaveBeenCalledTimes(3); + expect(bulkUpdate).toHaveBeenCalledWith([task5, task6]); + resolve(); + }); + }, 0); + + expect(bulkUpdate).toHaveBeenCalledTimes(1); + Promise.all([bufferedUpdate(task3), bufferedUpdate(task4)]).then((_) => { + expect(bulkUpdate).toHaveBeenCalledTimes(2); + expect(bulkUpdate).toHaveBeenCalledWith([task3, task4]); + }); + }, 0); + }); + }); + + test('handles both resolutions and rejections at individual task level', async (done) => { + const bulkUpdate: jest.Mocked> = jest.fn( + ([task1, task2, task3]) => { + return Promise.resolve([ + incrementAttempts(task1), + errorAttempts(task2), + incrementAttempts(task3), + ]); + } + ); + + const bufferedUpdate = createBuffer(bulkUpdate); + + const task1 = createTask(); + const task2 = createTask(); + const task3 = createTask(); + + return Promise.all([ + expect(bufferedUpdate(task1)).resolves.toMatchObject(incrementAttempts(task1)), + expect(bufferedUpdate(task2)).rejects.toMatchObject( + mapErr( + (err: OperationError) => asErr(err.error), + errorAttempts(task2) + ) + ), + expect(bufferedUpdate(task3)).resolves.toMatchObject(incrementAttempts(task3)), + ]).then(() => { + expect(bulkUpdate).toHaveBeenCalledTimes(1); + done(); + }); + }); + + test('handles bulkUpdate failure', async (done) => { + const bulkUpdate: jest.Mocked> = jest.fn(() => { + return Promise.reject(new Error('bulkUpdate is an illusion')); + }); + + const bufferedUpdate = createBuffer(bulkUpdate); + + const task1 = createTask(); + const task2 = createTask(); + const task3 = createTask(); + + return Promise.all([ + expect(bufferedUpdate(task1)).rejects.toMatchInlineSnapshot(` + Object { + "error": [Error: bulkUpdate is an illusion], + "tag": "err", + } + `), + expect(bufferedUpdate(task2)).rejects.toMatchInlineSnapshot(` + Object { + "error": [Error: bulkUpdate is an illusion], + "tag": "err", + } + `), + expect(bufferedUpdate(task3)).rejects.toMatchInlineSnapshot(` + Object { + "error": [Error: bulkUpdate is an illusion], + "tag": "err", + } + `), + ]).then(() => { + expect(bulkUpdate).toHaveBeenCalledTimes(1); + done(); + }); + }); + }); +}); diff --git a/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts b/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts new file mode 100644 index 0000000000000..a02a96607f4ec --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { keyBy, map } from 'lodash'; +import { Subject } from 'rxjs'; +import { bufferWhen, filter } from 'rxjs/operators'; +import { either, Result, asOk, asErr, Ok, Err } from './result_type'; + +export interface Entity { + id: string; +} + +export interface OperationError { + entity: H; + error: E; +} + +export type OperationResult = Result>; + +export type Operation = (entity: H) => Promise>; +export type BulkOperation = (entities: H[]) => Promise>>; + +export function createBuffer( + bulkOperation: BulkOperation +): Operation { + const flushBuffer = new Subject(); + const storeUpdateBuffer = new Subject<{ + entity: H; + onSuccess: (entity: Ok) => void; + onFailure: (error: Err) => void; + }>(); + + storeUpdateBuffer + .pipe( + bufferWhen(() => flushBuffer), + filter((tasks) => tasks.length > 0) + ) + .subscribe((entities) => { + const entityById = keyBy(entities, ({ entity: { id } }) => id); + bulkOperation(map(entities, 'entity')) + .then((results) => { + results.forEach((result) => + either( + result, + (entity) => { + entityById[entity.id].onSuccess(asOk(entity)); + }, + ({ entity, error }: OperationError) => { + entityById[entity.id].onFailure(asErr(error)); + } + ) + ); + }) + .catch((ex) => { + entities.forEach(({ onFailure }) => onFailure(asErr(ex))); + }); + }); + + return async function (entity: H) { + return new Promise((resolve, reject) => { + // ensure we flush by the end of the "current" event loop tick + setImmediate(() => flushBuffer.next()); + storeUpdateBuffer.next({ entity, onSuccess: resolve, onFailure: reject }); + }); + }; +} diff --git a/x-pack/plugins/task_manager/server/lib/result_type.ts b/x-pack/plugins/task_manager/server/lib/result_type.ts index edf4d84dd226d..a7fbdf80679d7 100644 --- a/x-pack/plugins/task_manager/server/lib/result_type.ts +++ b/x-pack/plugins/task_manager/server/lib/result_type.ts @@ -47,6 +47,14 @@ export async function promiseResult(future: Promise): Promise(future: Promise>): Promise { + return map( + await future, + (value: T) => Promise.resolve(value), + (err: E) => Promise.reject(err) + ); +} + export function unwrap(result: Result): T | E { return isOk(result) ? result.value : result.error; } diff --git a/x-pack/plugins/task_manager/server/task_manager.ts b/x-pack/plugins/task_manager/server/task_manager.ts index 23cb33cfac6c2..615d69085136c 100644 --- a/x-pack/plugins/task_manager/server/task_manager.ts +++ b/x-pack/plugins/task_manager/server/task_manager.ts @@ -57,6 +57,7 @@ import { } from './task_store'; import { identifyEsError } from './lib/identify_es_error'; import { ensureDeprecatedFieldsAreCorrected } from './lib/correct_deprecated_fields'; +import { BufferedTaskStore } from './buffered_task_store'; const VERSION_CONFLICT_STATUS = 409; @@ -90,7 +91,10 @@ export type TaskLifecycleEvent = TaskMarkRunning | TaskRun | TaskClaim | TaskRun */ export class TaskManager { private definitions: TaskDictionary = {}; + private store: TaskStore; + private bufferedStore: BufferedTaskStore; + private logger: Logger; private pool: TaskPool; // all task related events (task claimed, task marked as running, etc.) are emitted through events$ @@ -139,6 +143,8 @@ export class TaskManager { // pipe store events into the TaskManager's event stream this.store.events.subscribe((event) => this.events$.next(event)); + this.bufferedStore = new BufferedTaskStore(this.store); + this.pool = new TaskPool({ logger: this.logger, maxWorkers: opts.config.max_workers, @@ -165,7 +171,7 @@ export class TaskManager { return new TaskManagerRunner({ logger: this.logger, instance, - store: this.store, + store: this.bufferedStore, definitions: this.definitions, beforeRun: this.middleware.beforeRun, beforeMarkRunning: this.middleware.beforeMarkRunning, diff --git a/x-pack/plugins/task_manager/server/task_runner.ts b/x-pack/plugins/task_manager/server/task_runner.ts index 4c690a5675f61..ebf13fac2f311 100644 --- a/x-pack/plugins/task_manager/server/task_runner.ts +++ b/x-pack/plugins/task_manager/server/task_runner.ts @@ -49,7 +49,7 @@ export interface TaskRunner { toString: () => string; } -interface Updatable { +export interface Updatable { readonly maxAttempts: number; update(doc: ConcreteTaskInstance): Promise; remove(id: string): Promise; diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index 4a691e17011e8..984bdef376582 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -17,9 +17,10 @@ import { SavedObjectsSerializer, SavedObjectsRawDoc, ISavedObjectsRepository, + SavedObjectsUpdateResponse, } from '../../../../src/core/server'; -import { asOk, asErr } from './lib/result_type'; +import { asOk, asErr, Result } from './lib/result_type'; import { ConcreteTaskInstance, @@ -332,6 +333,50 @@ export class TaskStore { ); } + /** + * Updates the specified docs in the index, returning the docs + * with their versions up to date. + * + * @param {Array} docs + * @returns {Promise>} + */ + public async bulkUpdate( + docs: ConcreteTaskInstance[] + ): Promise>> { + const attributesByDocId = docs.reduce((attrsById, doc) => { + attrsById.set(doc.id, taskInstanceToAttributes(doc)); + return attrsById; + }, new Map()); + + const updatedSavedObjects: Array = ( + await this.savedObjectsRepository.bulkUpdate( + docs.map((doc) => ({ + type: 'task', + id: doc.id, + options: { version: doc.version }, + attributes: attributesByDocId.get(doc.id)!, + })), + { + refresh: false, + } + ) + ).saved_objects; + + return updatedSavedObjects.map>((updatedSavedObject) => + isSavedObjectsUpdateResponse(updatedSavedObject) + ? asOk( + savedObjectToConcreteTaskInstance({ + ...updatedSavedObject, + attributes: defaults( + updatedSavedObject.attributes, + attributesByDocId.get(updatedSavedObject.id)! + ), + }) + ) + : asErr(updatedSavedObject) + ); + } + /** * Removes the specified task from the index. * @@ -468,3 +513,9 @@ function ensureQueryOnlyReturnsTaskObjects(opts: SearchOpts): SearchOpts { query, }; } + +function isSavedObjectsUpdateResponse( + result: SavedObjectsUpdateResponse | Error +): result is SavedObjectsUpdateResponse { + return result && typeof (result as SavedObjectsUpdateResponse).id === 'string'; +}