Skip to content

Commit

Permalink
added missing unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
gmmorris committed Jul 13, 2020
1 parent 19246f5 commit dcc2312
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 41 deletions.
82 changes: 82 additions & 0 deletions x-pack/plugins/task_manager/server/buffered_task_store.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import uuid from 'uuid';
import { taskStoreMock } from './task_store.mock';
import { BufferedTaskStore } from './buffered_task_store';
import { asErr, asOk } from './lib/result_type';
import { TaskStatus } from './task';

describe('Buffered Task Store', () => {
test('proxies the TaskStore for `maxAttempts` and `remove`', async () => {
const taskStore = taskStoreMock.create({ maxAttempts: 10 });
taskStore.bulkUpdate.mockResolvedValue([]);
const bufferedStore = new BufferedTaskStore(taskStore);

expect(bufferedStore.maxAttempts).toEqual(10);

bufferedStore.remove('1');
expect(taskStore.remove).toHaveBeenCalledWith('1');
});

describe('update', () => {
test("proxies the TaskStore's `bulkUpdate`", async () => {
const taskStore = taskStoreMock.create({ maxAttempts: 10 });
const bufferedStore = new BufferedTaskStore(taskStore);

const task = mockTask();

taskStore.bulkUpdate.mockResolvedValue([asOk(task)]);

expect(await bufferedStore.update(task)).toMatchObject(task);
expect(taskStore.bulkUpdate).toHaveBeenCalledWith([task]);
});

test('handles partially successfull bulkUpdates resolving each call appropriately', async () => {
const taskStore = taskStoreMock.create({ maxAttempts: 10 });
const bufferedStore = new BufferedTaskStore(taskStore);

const tasks = [mockTask(), mockTask(), mockTask()];

taskStore.bulkUpdate.mockResolvedValueOnce([
asOk(tasks[0]),
asErr(new Error('Oh no, something went terribly wrong')),
asOk(tasks[2]),
]);

const results = [
bufferedStore.update(tasks[0]),
bufferedStore.update(tasks[1]),
bufferedStore.update(tasks[2]),
];
expect(await results[0]).toMatchObject(tasks[0]);
expect(results[1]).rejects.toMatchInlineSnapshot(
`[Error: Oh no, something went terribly wrong]`
);
expect(await results[2]).toMatchObject(tasks[2]);
});
});
});

function mockTask() {
return {
id: `task_${uuid.v4()}`,
attempts: 0,
schedule: undefined,
params: { hello: 'world' },
retryAt: null,
runAt: new Date(),
scheduledAt: new Date(),
scope: undefined,
startedAt: null,
state: { foo: 'bar' },
status: TaskStatus.Idle,
taskType: 'report',
user: undefined,
version: '123',
ownerId: '123',
};
}
31 changes: 18 additions & 13 deletions x-pack/plugins/task_manager/server/buffered_task_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,27 @@ import { TaskStore } from './task_store';
import { ConcreteTaskInstance } from './task';
import { Updatable } from './task_runner';
import { createBuffer, Operation } from './lib/bulk_operation_buffer';
import { unwrapPromise, mapErr } from './lib/result_type';
import { unwrapPromise, asErr, mapErr } from './lib/result_type';

export class BufferedTaskStore implements Updatable {
private bufferedUpdate: Operation<ConcreteTaskInstance, Error>;
private bufferedUpdate: Operation<ConcreteTaskInstance, ConcreteTaskInstance, Error>;
constructor(private readonly taskStore: TaskStore) {
this.bufferedUpdate = createBuffer<ConcreteTaskInstance, Error>(async (docs) => {
return (await taskStore.bulkUpdate(docs)).map((entityOrError, index) =>
mapErr(
(error: Error) => ({
entity: docs[index],
error,
}),
entityOrError
)
);
});
this.bufferedUpdate = createBuffer<ConcreteTaskInstance, ConcreteTaskInstance, Error>(
async (docs) => {
return (await taskStore.bulkUpdate(docs)).map((entityOrError, index) =>
mapErr(
(error: Error) =>
asErr({
// TaskStore's bulkUpdate maintains the order of the docs
// so we can rely on the index in the `docs` to match an entity with an index
entity: docs[index],
error,
}),
entityOrError
)
);
}
);
}

public get maxAttempts(): number {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ function errorAttempts(task: TaskInstance): Err<OperationError<TaskInstance, Err
});
}

describe('Task Store Buffer', () => {
describe('Bulk Operation Buffer', () => {
describe('createBuffer()', () => {
test('batches up multiple Operation calls', async () => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn(
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, TaskInstance, Error>> = jest.fn(
([task1, task2]) => {
return Promise.resolve([incrementAttempts(task1), incrementAttempts(task2)]);
}
Expand All @@ -55,9 +55,11 @@ describe('Task Store Buffer', () => {
});

test('batch updates are executed at most by the next Event Loop tick', async () => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn((tasks) => {
return Promise.resolve(tasks.map(incrementAttempts));
});
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, TaskInstance, Error>> = jest.fn(
(tasks) => {
return Promise.resolve(tasks.map(incrementAttempts));
}
);

const bufferedUpdate = createBuffer(bulkUpdate);

Expand Down Expand Up @@ -97,7 +99,7 @@ describe('Task Store Buffer', () => {
});

test('handles both resolutions and rejections at individual task level', async (done) => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn(
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, TaskInstance, Error>> = jest.fn(
([task1, task2, task3]) => {
return Promise.resolve([
incrementAttempts(task1),
Expand Down Expand Up @@ -129,9 +131,11 @@ describe('Task Store Buffer', () => {
});

test('handles bulkUpdate failure', async (done) => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn(() => {
return Promise.reject(new Error('bulkUpdate is an illusion'));
});
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, TaskInstance, Error>> = jest.fn(
() => {
return Promise.reject(new Error('bulkUpdate is an illusion'));
}
);

const bufferedUpdate = createBuffer(bulkUpdate);

Expand Down
35 changes: 21 additions & 14 deletions x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,31 @@ export interface Entity {
id: string;
}

export interface OperationError<H, E> {
entity: H;
error: E;
export interface OperationError<Input, ErrorOutput> {
entity: Input;
error: ErrorOutput;
}

export type OperationResult<H, E> = Result<H, OperationError<H, E>>;
export type OperationResult<Input, Output, ErrorOutput> = Result<
Output,
OperationError<Input, ErrorOutput>
>;

export type Operation<H, E> = (entity: H) => Promise<Result<H, E>>;
export type BulkOperation<H, E> = (entities: H[]) => Promise<Array<OperationResult<H, E>>>;
export type Operation<Input, Output, ErrorOutput> = (
entity: Input
) => Promise<Result<Output, ErrorOutput>>;
export type BulkOperation<Input, Output, ErrorOutput> = (
entities: Input[]
) => Promise<Array<OperationResult<Input, Output, ErrorOutput>>>;

export function createBuffer<H extends Entity, E>(
bulkOperation: BulkOperation<H, E>
): Operation<H, E> {
export function createBuffer<Input extends Entity, Output extends Entity, ErrorOutput>(
bulkOperation: BulkOperation<Input, Output, ErrorOutput>
): Operation<Input, Output, ErrorOutput> {
const flushBuffer = new Subject<void>();
const storeUpdateBuffer = new Subject<{
entity: H;
onSuccess: (entity: Ok<H>) => void;
onFailure: (error: Err<E>) => void;
entity: Input;
onSuccess: (entity: Ok<Output>) => void;
onFailure: (error: Err<ErrorOutput>) => void;
}>();

storeUpdateBuffer
Expand All @@ -48,7 +55,7 @@ export function createBuffer<H extends Entity, E>(
(entity) => {
entityById[entity.id].onSuccess(asOk(entity));
},
({ entity, error }: OperationError<H, E>) => {
({ entity, error }: OperationError<Input, ErrorOutput>) => {
entityById[entity.id].onFailure(asErr(error));
}
)
Expand All @@ -59,7 +66,7 @@ export function createBuffer<H extends Entity, E>(
});
});

return async function (entity: H) {
return async function (entity: Input) {
return new Promise((resolve, reject) => {
// ensure we flush by the end of the "current" event loop tick
setImmediate(() => flushBuffer.next());
Expand Down
19 changes: 14 additions & 5 deletions x-pack/plugins/task_manager/server/lib/result_type.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,34 @@ export function asErr<T>(error: T): Err<T> {
};
}

export function isResult<T, E>(maybeResult: unknown): maybeResult is Result<T, E> {
return (
(maybeResult as Result<T, E>)?.tag === 'ok' || (maybeResult as Result<T, E>)?.tag === 'err'
);
}

export function isOk<T, E>(result: Result<T, E>): result is Ok<T> {
return result.tag === 'ok';
return result?.tag === 'ok';
}

export function isErr<T, E>(result: Result<T, E>): result is Err<E> {
return !isOk(result);
}

export async function promiseResult<T, E>(future: Promise<T>): Promise<Result<T, E>> {
export async function promiseResult<T, E>(
future: Promise<T | Result<T, E>>
): Promise<Result<T, E>> {
try {
return asOk(await future);
const result = await future;
return isResult(result) ? result : asOk(result);
} catch (e) {
return asErr(e);
return isResult<T, E>(e) ? e : asErr(e);
}
}

export async function unwrapPromise<T, E>(future: Promise<Result<T, E>>): Promise<T> {
return map(
await future,
await promiseResult(future),
(value: T) => Promise.resolve(value),
(err: E) => Promise.reject(err)
);
Expand Down
31 changes: 31 additions & 0 deletions x-pack/plugins/task_manager/server/task_store.mock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { TaskStore } from './task_store';

interface TaskStoreOptions {
maxAttempts?: number;
index?: string;
taskManagerId?: string;
}
export const taskStoreMock = {
create({ maxAttempts = 0, index = '', taskManagerId = '' }: TaskStoreOptions) {
const mocked = ({
update: jest.fn(),
remove: jest.fn(),
schedule: jest.fn(),
claimAvailableTasks: jest.fn(),
bulkUpdate: jest.fn(),
get: jest.fn(),
getLifecycle: jest.fn(),
fetch: jest.fn(),
maxAttempts,
index,
taskManagerId,
} as unknown) as jest.Mocked<TaskStore>;
return mocked;
},
};

0 comments on commit dcc2312

Please sign in to comment.