diff --git a/x-pack/plugins/task_manager/server/task_store.test.ts b/x-pack/plugins/task_manager/server/task_store.test.ts index a02123c4a3f8d..78be118507954 100644 --- a/x-pack/plugins/task_manager/server/task_store.test.ts +++ b/x-pack/plugins/task_manager/server/task_store.test.ts @@ -7,7 +7,7 @@ import _ from 'lodash'; import sinon from 'sinon'; import uuid from 'uuid'; -import { filter } from 'rxjs/operators'; +import { filter, first } from 'rxjs/operators'; import { Option, some, none } from 'fp-ts/lib/Option'; import { @@ -66,8 +66,21 @@ const mockedDate = new Date('2019-02-12T21:01:22.479Z'); describe('TaskStore', () => { describe('schedule', () => { + let store: TaskStore; + + beforeAll(() => { + store = new TaskStore({ + index: 'tasky', + taskManagerId: '', + serializer, + callCluster: jest.fn(), + maxAttempts: 2, + definitions: taskDefinitions, + savedObjectsRepository: savedObjectsClient, + }); + }); + async function testSchedule(task: unknown) { - const callCluster = jest.fn(); savedObjectsClient.create.mockImplementation(async (type: string, attributes: unknown) => ({ id: 'testid', type, @@ -75,15 +88,6 @@ describe('TaskStore', () => { references: [], version: '123', })); - const store = new TaskStore({ - index: 'tasky', - taskManagerId: '', - serializer, - callCluster, - maxAttempts: 2, - definitions: taskDefinitions, - savedObjectsRepository: savedObjectsClient, - }); const result = await store.schedule(task as TaskInstance); expect(savedObjectsClient.create).toHaveBeenCalledTimes(1); @@ -176,12 +180,28 @@ describe('TaskStore', () => { /Unsupported task type "nope"/i ); }); + + test('pushes error from saved objects client to errors$', async () => { + const task: TaskInstance = { + id: 'id', + params: { hello: 'world' }, + state: { foo: 'bar' }, + taskType: 'report', + }; + + const firstErrorPromise = store.errors$.pipe(first()).toPromise(); + savedObjectsClient.create.mockRejectedValue(new Error('Failure')); + await expect(store.schedule(task)).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`); + expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`); + }); }); describe('fetch', () => { - async function testFetch(opts?: SearchOpts, hits: unknown[] = []) { - const callCluster = sinon.spy(async (name: string, params?: unknown) => ({ hits: { hits } })); - const store = new TaskStore({ + let store: TaskStore; + const callCluster = jest.fn(); + + beforeAll(() => { + store = new TaskStore({ index: 'tasky', taskManagerId: '', serializer, @@ -190,15 +210,19 @@ describe('TaskStore', () => { definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, }); + }); + + async function testFetch(opts?: SearchOpts, hits: unknown[] = []) { + callCluster.mockResolvedValue({ hits: { hits } }); const result = await store.fetch(opts); - sinon.assert.calledOnce(callCluster); - sinon.assert.calledWith(callCluster, 'search'); + expect(callCluster).toHaveBeenCalledTimes(1); + expect(callCluster).toHaveBeenCalledWith('search', expect.anything()); return { result, - args: callCluster.args[0][1], + args: callCluster.mock.calls[0][1], }; } @@ -230,6 +254,13 @@ describe('TaskStore', () => { }, }); }); + + test('pushes error from call cluster to errors$', async () => { + const firstErrorPromise = store.errors$.pipe(first()).toPromise(); + callCluster.mockRejectedValue(new Error('Failure')); + await expect(store.fetch()).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`); + expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`); + }); }); describe('claimAvailableTasks', () => { @@ -831,9 +862,46 @@ if (doc['task.runAt'].size()!=0) { }, ]); }); + + test('pushes error from saved objects client to errors$', async () => { + const callCluster = jest.fn(); + const store = new TaskStore({ + index: 'tasky', + taskManagerId: '', + serializer, + callCluster, + definitions: taskDefinitions, + maxAttempts: 2, + savedObjectsRepository: savedObjectsClient, + }); + + const firstErrorPromise = store.errors$.pipe(first()).toPromise(); + callCluster.mockRejectedValue(new Error('Failure')); + await expect( + store.claimAvailableTasks({ + claimOwnershipUntil: new Date(), + size: 10, + }) + ).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`); + expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`); + }); }); describe('update', () => { + let store: TaskStore; + + beforeAll(() => { + store = new TaskStore({ + index: 'tasky', + taskManagerId: '', + serializer, + callCluster: jest.fn(), + maxAttempts: 2, + definitions: taskDefinitions, + savedObjectsRepository: savedObjectsClient, + }); + }); + test('refreshes the index, handles versioning', async () => { const task = { runAt: mockedDate, @@ -862,16 +930,6 @@ if (doc['task.runAt'].size()!=0) { } ); - const store = new TaskStore({ - index: 'tasky', - taskManagerId: '', - serializer, - callCluster: jest.fn(), - maxAttempts: 2, - definitions: taskDefinitions, - savedObjectsRepository: savedObjectsClient, - }); - const result = await store.update(task); expect(savedObjectsClient.update).toHaveBeenCalledWith( @@ -905,28 +963,116 @@ if (doc['task.runAt'].size()!=0) { version: '123', }); }); + + test('pushes error from saved objects client to errors$', async () => { + const task = { + runAt: mockedDate, + scheduledAt: mockedDate, + startedAt: null, + retryAt: null, + id: 'task:324242', + params: { hello: 'world' }, + state: { foo: 'bar' }, + taskType: 'report', + attempts: 3, + status: 'idle' as TaskStatus, + version: '123', + ownerId: null, + }; + + const firstErrorPromise = store.errors$.pipe(first()).toPromise(); + savedObjectsClient.update.mockRejectedValue(new Error('Failure')); + await expect(store.update(task)).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`); + expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`); + }); + }); + + describe('bulkUpdate', () => { + let store: TaskStore; + + beforeAll(() => { + store = new TaskStore({ + index: 'tasky', + taskManagerId: '', + serializer, + callCluster: jest.fn(), + maxAttempts: 2, + definitions: taskDefinitions, + savedObjectsRepository: savedObjectsClient, + }); + }); + + test('pushes error from saved objects client to errors$', async () => { + const task = { + runAt: mockedDate, + scheduledAt: mockedDate, + startedAt: null, + retryAt: null, + id: 'task:324242', + params: { hello: 'world' }, + state: { foo: 'bar' }, + taskType: 'report', + attempts: 3, + status: 'idle' as TaskStatus, + version: '123', + ownerId: null, + }; + + const firstErrorPromise = store.errors$.pipe(first()).toPromise(); + savedObjectsClient.bulkUpdate.mockRejectedValue(new Error('Failure')); + await expect(store.bulkUpdate([task])).rejects.toThrowErrorMatchingInlineSnapshot( + `"Failure"` + ); + expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`); + }); }); describe('remove', () => { - test('removes the task with the specified id', async () => { - const id = `id-${_.random(1, 20)}`; - const callCluster = jest.fn(); - const store = new TaskStore({ + let store: TaskStore; + + beforeAll(() => { + store = new TaskStore({ index: 'tasky', taskManagerId: '', serializer, - callCluster, + callCluster: jest.fn(), maxAttempts: 2, definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, }); + }); + + test('removes the task with the specified id', async () => { + const id = `id-${_.random(1, 20)}`; const result = await store.remove(id); expect(result).toBeUndefined(); expect(savedObjectsClient.delete).toHaveBeenCalledWith('task', id); }); + + test('pushes error from saved objects client to errors$', async () => { + const id = `id-${_.random(1, 20)}`; + const firstErrorPromise = store.errors$.pipe(first()).toPromise(); + savedObjectsClient.delete.mockRejectedValue(new Error('Failure')); + await expect(store.remove(id)).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`); + expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`); + }); }); describe('get', () => { + let store: TaskStore; + + beforeAll(() => { + store = new TaskStore({ + index: 'tasky', + taskManagerId: '', + serializer, + callCluster: jest.fn(), + maxAttempts: 2, + definitions: taskDefinitions, + savedObjectsRepository: savedObjectsClient, + }); + }); + test('gets the task with the specified id', async () => { const id = `id-${_.random(1, 20)}`; const task = { @@ -944,7 +1090,6 @@ if (doc['task.runAt'].size()!=0) { ownerId: null, }; - const callCluster = jest.fn(); savedObjectsClient.get.mockImplementation(async (type: string, objectId: string) => ({ id: objectId, type, @@ -956,22 +1101,20 @@ if (doc['task.runAt'].size()!=0) { version: '123', })); - const store = new TaskStore({ - index: 'tasky', - taskManagerId: '', - serializer, - callCluster, - maxAttempts: 2, - definitions: taskDefinitions, - savedObjectsRepository: savedObjectsClient, - }); - const result = await store.get(id); expect(result).toEqual(task); expect(savedObjectsClient.get).toHaveBeenCalledWith('task', id); }); + + test('pushes error from saved objects client to errors$', async () => { + const id = `id-${_.random(1, 20)}`; + const firstErrorPromise = store.errors$.pipe(first()).toPromise(); + savedObjectsClient.get.mockRejectedValue(new Error('Failure')); + await expect(store.get(id)).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`); + expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`); + }); }); describe('getLifecycle', () => { diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index f2da41053e6ab..4f193c9401cf9 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -121,6 +121,7 @@ export class TaskStore { public readonly maxAttempts: number; public readonly index: string; public readonly taskManagerId: string; + public readonly errors$ = new Subject(); private callCluster: ElasticJs; private definitions: TaskDictionary; @@ -171,11 +172,17 @@ export class TaskStore { ); } - const savedObject = await this.savedObjectsRepository.create( - 'task', - taskInstanceToAttributes(taskInstance), - { id: taskInstance.id, refresh: false } - ); + let savedObject; + try { + savedObject = await this.savedObjectsRepository.create( + 'task', + taskInstanceToAttributes(taskInstance), + { id: taskInstance.id, refresh: false } + ); + } catch (e) { + this.errors$.next(e); + throw e; + } return savedObjectToConcreteTaskInstance(savedObject); } @@ -333,12 +340,22 @@ export class TaskStore { */ public async update(doc: ConcreteTaskInstance): Promise { const attributes = taskInstanceToAttributes(doc); - const updatedSavedObject = await this.savedObjectsRepository.update< - SerializedConcreteTaskInstance - >('task', doc.id, attributes, { - refresh: false, - version: doc.version, - }); + + let updatedSavedObject; + try { + updatedSavedObject = await this.savedObjectsRepository.update( + 'task', + doc.id, + attributes, + { + refresh: false, + version: doc.version, + } + ); + } catch (e) { + this.errors$.next(e); + throw e; + } return savedObjectToConcreteTaskInstance( // The SavedObjects update api forces a Partial on the `attributes` on the response, @@ -362,8 +379,11 @@ export class TaskStore { return attrsById; }, new Map()); - const updatedSavedObjects: Array = ( - await this.savedObjectsRepository.bulkUpdate( + let updatedSavedObjects: Array; + try { + ({ saved_objects: updatedSavedObjects } = await this.savedObjectsRepository.bulkUpdate< + SerializedConcreteTaskInstance + >( docs.map((doc) => ({ type: 'task', id: doc.id, @@ -373,8 +393,11 @@ export class TaskStore { { refresh: false, } - ) - ).saved_objects; + )); + } catch (e) { + this.errors$.next(e); + throw e; + } return updatedSavedObjects.map((updatedSavedObject, index) => isSavedObjectsUpdateResponse(updatedSavedObject) @@ -404,7 +427,12 @@ export class TaskStore { * @returns {Promise} */ public async remove(id: string): Promise { - await this.savedObjectsRepository.delete('task', id); + try { + await this.savedObjectsRepository.delete('task', id); + } catch (e) { + this.errors$.next(e); + throw e; + } } /** @@ -414,7 +442,14 @@ export class TaskStore { * @returns {Promise} */ public async get(id: string): Promise { - return savedObjectToConcreteTaskInstance(await this.savedObjectsRepository.get('task', id)); + let result; + try { + result = await this.savedObjectsRepository.get('task', id); + } catch (e) { + this.errors$.next(e); + throw e; + } + return savedObjectToConcreteTaskInstance(result); } /** @@ -438,14 +473,20 @@ export class TaskStore { private async search(opts: SearchOpts = {}): Promise { const { query } = ensureQueryOnlyReturnsTaskObjects(opts); - const result = await this.callCluster('search', { - index: this.index, - ignoreUnavailable: true, - body: { - ...opts, - query, - }, - }); + let result; + try { + result = await this.callCluster('search', { + index: this.index, + ignoreUnavailable: true, + body: { + ...opts, + query, + }, + }); + } catch (e) { + this.errors$.next(e); + throw e; + } const rawDocs = (result as SearchResponse).hits.hits; @@ -463,17 +504,23 @@ export class TaskStore { { max_docs }: UpdateByQueryOpts = {} ): Promise { const { query } = ensureQueryOnlyReturnsTaskObjects(opts); - const result = await this.callCluster('updateByQuery', { - index: this.index, - ignoreUnavailable: true, - refresh: true, - max_docs, - conflicts: 'proceed', - body: { - ...opts, - query, - }, - }); + let result; + try { + result = await this.callCluster('updateByQuery', { + index: this.index, + ignoreUnavailable: true, + refresh: true, + max_docs, + conflicts: 'proceed', + body: { + ...opts, + query, + }, + }); + } catch (e) { + this.errors$.next(e); + throw e; + } // eslint-disable-next-line @typescript-eslint/naming-convention const { total, updated, version_conflicts } = result as UpdateDocumentByQueryResponse;