Skip to content

Commit

Permalink
Make the task manager store emit error events (#75679)
Browse files Browse the repository at this point in the history
* Add errors$ observable to the task store

* Add unit tests
  • Loading branch information
mikecote authored Sep 9, 2020
1 parent 32468ad commit d721fea
Show file tree
Hide file tree
Showing 2 changed files with 269 additions and 79 deletions.
229 changes: 186 additions & 43 deletions x-pack/plugins/task_manager/server/task_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import _ from 'lodash';
import sinon from 'sinon';
import uuid from 'uuid';
import { filter } from 'rxjs/operators';
import { filter, first } from 'rxjs/operators';
import { Option, some, none } from 'fp-ts/lib/Option';

import {
Expand Down Expand Up @@ -66,24 +66,28 @@ const mockedDate = new Date('2019-02-12T21:01:22.479Z');

describe('TaskStore', () => {
describe('schedule', () => {
let store: TaskStore;

beforeAll(() => {
store = new TaskStore({
index: 'tasky',
taskManagerId: '',
serializer,
callCluster: jest.fn(),
maxAttempts: 2,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
});
});

async function testSchedule(task: unknown) {
const callCluster = jest.fn();
savedObjectsClient.create.mockImplementation(async (type: string, attributes: unknown) => ({
id: 'testid',
type,
attributes,
references: [],
version: '123',
}));
const store = new TaskStore({
index: 'tasky',
taskManagerId: '',
serializer,
callCluster,
maxAttempts: 2,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
});
const result = await store.schedule(task as TaskInstance);

expect(savedObjectsClient.create).toHaveBeenCalledTimes(1);
Expand Down Expand Up @@ -176,12 +180,28 @@ describe('TaskStore', () => {
/Unsupported task type "nope"/i
);
});

test('pushes error from saved objects client to errors$', async () => {
const task: TaskInstance = {
id: 'id',
params: { hello: 'world' },
state: { foo: 'bar' },
taskType: 'report',
};

const firstErrorPromise = store.errors$.pipe(first()).toPromise();
savedObjectsClient.create.mockRejectedValue(new Error('Failure'));
await expect(store.schedule(task)).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`);
expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`);
});
});

describe('fetch', () => {
async function testFetch(opts?: SearchOpts, hits: unknown[] = []) {
const callCluster = sinon.spy(async (name: string, params?: unknown) => ({ hits: { hits } }));
const store = new TaskStore({
let store: TaskStore;
const callCluster = jest.fn();

beforeAll(() => {
store = new TaskStore({
index: 'tasky',
taskManagerId: '',
serializer,
Expand All @@ -190,15 +210,19 @@ describe('TaskStore', () => {
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
});
});

async function testFetch(opts?: SearchOpts, hits: unknown[] = []) {
callCluster.mockResolvedValue({ hits: { hits } });

const result = await store.fetch(opts);

sinon.assert.calledOnce(callCluster);
sinon.assert.calledWith(callCluster, 'search');
expect(callCluster).toHaveBeenCalledTimes(1);
expect(callCluster).toHaveBeenCalledWith('search', expect.anything());

return {
result,
args: callCluster.args[0][1],
args: callCluster.mock.calls[0][1],
};
}

Expand Down Expand Up @@ -230,6 +254,13 @@ describe('TaskStore', () => {
},
});
});

test('pushes error from call cluster to errors$', async () => {
const firstErrorPromise = store.errors$.pipe(first()).toPromise();
callCluster.mockRejectedValue(new Error('Failure'));
await expect(store.fetch()).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`);
expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`);
});
});

describe('claimAvailableTasks', () => {
Expand Down Expand Up @@ -831,9 +862,46 @@ if (doc['task.runAt'].size()!=0) {
},
]);
});

test('pushes error from saved objects client to errors$', async () => {
const callCluster = jest.fn();
const store = new TaskStore({
index: 'tasky',
taskManagerId: '',
serializer,
callCluster,
definitions: taskDefinitions,
maxAttempts: 2,
savedObjectsRepository: savedObjectsClient,
});

const firstErrorPromise = store.errors$.pipe(first()).toPromise();
callCluster.mockRejectedValue(new Error('Failure'));
await expect(
store.claimAvailableTasks({
claimOwnershipUntil: new Date(),
size: 10,
})
).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`);
expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`);
});
});

describe('update', () => {
let store: TaskStore;

beforeAll(() => {
store = new TaskStore({
index: 'tasky',
taskManagerId: '',
serializer,
callCluster: jest.fn(),
maxAttempts: 2,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
});
});

test('refreshes the index, handles versioning', async () => {
const task = {
runAt: mockedDate,
Expand Down Expand Up @@ -862,16 +930,6 @@ if (doc['task.runAt'].size()!=0) {
}
);

const store = new TaskStore({
index: 'tasky',
taskManagerId: '',
serializer,
callCluster: jest.fn(),
maxAttempts: 2,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
});

const result = await store.update(task);

expect(savedObjectsClient.update).toHaveBeenCalledWith(
Expand Down Expand Up @@ -905,28 +963,116 @@ if (doc['task.runAt'].size()!=0) {
version: '123',
});
});

test('pushes error from saved objects client to errors$', async () => {
const task = {
runAt: mockedDate,
scheduledAt: mockedDate,
startedAt: null,
retryAt: null,
id: 'task:324242',
params: { hello: 'world' },
state: { foo: 'bar' },
taskType: 'report',
attempts: 3,
status: 'idle' as TaskStatus,
version: '123',
ownerId: null,
};

const firstErrorPromise = store.errors$.pipe(first()).toPromise();
savedObjectsClient.update.mockRejectedValue(new Error('Failure'));
await expect(store.update(task)).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`);
expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`);
});
});

describe('bulkUpdate', () => {
let store: TaskStore;

beforeAll(() => {
store = new TaskStore({
index: 'tasky',
taskManagerId: '',
serializer,
callCluster: jest.fn(),
maxAttempts: 2,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
});
});

test('pushes error from saved objects client to errors$', async () => {
const task = {
runAt: mockedDate,
scheduledAt: mockedDate,
startedAt: null,
retryAt: null,
id: 'task:324242',
params: { hello: 'world' },
state: { foo: 'bar' },
taskType: 'report',
attempts: 3,
status: 'idle' as TaskStatus,
version: '123',
ownerId: null,
};

const firstErrorPromise = store.errors$.pipe(first()).toPromise();
savedObjectsClient.bulkUpdate.mockRejectedValue(new Error('Failure'));
await expect(store.bulkUpdate([task])).rejects.toThrowErrorMatchingInlineSnapshot(
`"Failure"`
);
expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`);
});
});

describe('remove', () => {
test('removes the task with the specified id', async () => {
const id = `id-${_.random(1, 20)}`;
const callCluster = jest.fn();
const store = new TaskStore({
let store: TaskStore;

beforeAll(() => {
store = new TaskStore({
index: 'tasky',
taskManagerId: '',
serializer,
callCluster,
callCluster: jest.fn(),
maxAttempts: 2,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
});
});

test('removes the task with the specified id', async () => {
const id = `id-${_.random(1, 20)}`;
const result = await store.remove(id);
expect(result).toBeUndefined();
expect(savedObjectsClient.delete).toHaveBeenCalledWith('task', id);
});

test('pushes error from saved objects client to errors$', async () => {
const id = `id-${_.random(1, 20)}`;
const firstErrorPromise = store.errors$.pipe(first()).toPromise();
savedObjectsClient.delete.mockRejectedValue(new Error('Failure'));
await expect(store.remove(id)).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`);
expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`);
});
});

describe('get', () => {
let store: TaskStore;

beforeAll(() => {
store = new TaskStore({
index: 'tasky',
taskManagerId: '',
serializer,
callCluster: jest.fn(),
maxAttempts: 2,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
});
});

test('gets the task with the specified id', async () => {
const id = `id-${_.random(1, 20)}`;
const task = {
Expand All @@ -944,7 +1090,6 @@ if (doc['task.runAt'].size()!=0) {
ownerId: null,
};

const callCluster = jest.fn();
savedObjectsClient.get.mockImplementation(async (type: string, objectId: string) => ({
id: objectId,
type,
Expand All @@ -956,22 +1101,20 @@ if (doc['task.runAt'].size()!=0) {
version: '123',
}));

const store = new TaskStore({
index: 'tasky',
taskManagerId: '',
serializer,
callCluster,
maxAttempts: 2,
definitions: taskDefinitions,
savedObjectsRepository: savedObjectsClient,
});

const result = await store.get(id);

expect(result).toEqual(task);

expect(savedObjectsClient.get).toHaveBeenCalledWith('task', id);
});

test('pushes error from saved objects client to errors$', async () => {
const id = `id-${_.random(1, 20)}`;
const firstErrorPromise = store.errors$.pipe(first()).toPromise();
savedObjectsClient.get.mockRejectedValue(new Error('Failure'));
await expect(store.get(id)).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`);
expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`);
});
});

describe('getLifecycle', () => {
Expand Down
Loading

0 comments on commit d721fea

Please sign in to comment.