Skip to content

Commit

Permalink
[alerting] gracefully handle error in initialization of Alert TaskRun…
Browse files Browse the repository at this point in the history
…ner (elastic#54335)

Prevents an edge cases where Alerts can end up in a zombie state.

1. Decrypting attributes throws an error
2. Fetching an Api Key throws an error
3. Getting Services with user permissions throws an error
  • Loading branch information
gmmorris committed Jan 13, 2020
1 parent 5ffb19b commit dee61dc
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 14 deletions.
8 changes: 8 additions & 0 deletions x-pack/legacy/plugins/alerting/server/lib/result_type.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ export interface Err<E> {
}
export type Result<T, E> = Ok<T> | Err<E>;

export type Resultable<T, E> = {
[P in keyof T]: Result<T[P], E>;
};

export function asOk<T>(value: T): Ok<T> {
return {
tag: 'ok',
Expand Down Expand Up @@ -52,3 +56,7 @@ export function map<T, E, Resolution>(
): Resolution {
return isOk(result) ? onOk(result.value) : onErr(result.error);
}

export function resolveErr<T, E>(result: Result<T, E>, onErr: (error: E) => T): T {
return isOk(result) ? result.value : onErr(result.error);
}
Original file line number Diff line number Diff line change
Expand Up @@ -405,4 +405,97 @@ describe('Task Runner', () => {
}
`);
});

test('recovers gracefully when the Alert Task Runner throws an exception when fetching the encrypted attributes', async () => {
encryptedSavedObjectsPlugin.getDecryptedAsInternalUser.mockImplementation(() => {
throw new Error('OMG');
});

const taskRunner = new TaskRunner(
alertType,
mockedTaskInstance,
taskRunnerFactoryInitializerParams
);

savedObjectsClient.get.mockResolvedValueOnce(mockedAlertTypeSavedObject);

const runnerResult = await taskRunner.run();

expect(runnerResult).toMatchInlineSnapshot(`
Object {
"runAt": 1970-01-01T00:05:00.000Z,
"state": Object {
"previousStartedAt": 1970-01-01T00:00:00.000Z,
"startedAt": 1969-12-31T23:55:00.000Z,
},
}
`);
});

test('recovers gracefully when the Alert Task Runner throws an exception when getting internal Services', async () => {
taskRunnerFactoryInitializerParams.getServices.mockImplementation(() => {
throw new Error('OMG');
});

const taskRunner = new TaskRunner(
alertType,
mockedTaskInstance,
taskRunnerFactoryInitializerParams
);

savedObjectsClient.get.mockResolvedValueOnce(mockedAlertTypeSavedObject);
encryptedSavedObjectsPlugin.getDecryptedAsInternalUser.mockResolvedValueOnce({
id: '1',
type: 'alert',
attributes: {
apiKey: Buffer.from('123:abc').toString('base64'),
},
references: [],
});

const runnerResult = await taskRunner.run();

expect(runnerResult).toMatchInlineSnapshot(`
Object {
"runAt": 1970-01-01T00:05:00.000Z,
"state": Object {
"previousStartedAt": 1970-01-01T00:00:00.000Z,
"startedAt": 1969-12-31T23:55:00.000Z,
},
}
`);
});

test('recovers gracefully when the Alert Task Runner throws an exception when fetching attributes', async () => {
savedObjectsClient.get.mockImplementation(() => {
throw new Error('OMG');
});

const taskRunner = new TaskRunner(
alertType,
mockedTaskInstance,
taskRunnerFactoryInitializerParams
);

encryptedSavedObjectsPlugin.getDecryptedAsInternalUser.mockResolvedValueOnce({
id: '1',
type: 'alert',
attributes: {
apiKey: Buffer.from('123:abc').toString('base64'),
},
references: [],
});

const runnerResult = await taskRunner.run();

expect(runnerResult).toMatchInlineSnapshot(`
Object {
"runAt": 1970-01-01T00:05:00.000Z,
"state": Object {
"previousStartedAt": 1970-01-01T00:00:00.000Z,
"startedAt": 1969-12-31T23:55:00.000Z,
},
}
`);
});
});
75 changes: 61 additions & 14 deletions x-pack/legacy/plugins/alerting/server/task_runner/task_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,17 @@ import { AlertInstance, createAlertInstanceFactory } from '../alert_instance';
import { getNextRunAt } from './get_next_run_at';
import { validateAlertTypeParams } from '../lib';
import { AlertType, RawAlert, IntervalSchedule, Services, State, AlertInfoParams } from '../types';
import { promiseResult, map } from '../lib/result_type';
import { promiseResult, map, Resultable, asOk, asErr, resolveErr } from '../lib/result_type';

type AlertInstances = Record<string, AlertInstance>;

const FALLBACK_RETRY_INTERVAL: IntervalSchedule = { interval: '5m' };

interface AlertTaskRunResult {
state: State;
runAt: Date;
}

export class TaskRunner {
private context: TaskRunnerContext;
private logger: Logger;
Expand Down Expand Up @@ -190,7 +197,7 @@ export class TaskRunner {
};
}

async validateAndRunAlert(
async validateAndExecuteAlert(
services: Services,
apiKey: string | null,
attributes: RawAlert,
Expand All @@ -217,11 +224,9 @@ export class TaskRunner {
);
}

async run() {
async loadAlertAttributesAndRun(): Promise<Resultable<AlertTaskRunResult, Error>> {
const {
params: { alertId, spaceId },
startedAt: previousStartedAt,
state: originalState,
} = this.taskInstance;

const apiKey = await this.getApiKeyForAlertPermissions(alertId, spaceId);
Expand All @@ -233,11 +238,34 @@ export class TaskRunner {
alertId
);

return {
state: await promiseResult<State, Error>(
this.validateAndExecuteAlert(services, apiKey, attributes, references)
),
runAt: asOk(
getNextRunAt(
new Date(this.taskInstance.startedAt!),
// we do not currently have a good way of returning the type
// from SavedObjectsClient, and as we currenrtly require a schedule
// and we only support `interval`, we can cast this safely
attributes.schedule as IntervalSchedule
)
),
};
}

async run(): Promise<AlertTaskRunResult> {
const {
params: { alertId },
startedAt: previousStartedAt,
state: originalState,
} = this.taskInstance;

const { state, runAt } = await errorAsAlertTaskRunResult(this.loadAlertAttributesAndRun());

return {
state: map<State, Error, State>(
await promiseResult<State, Error>(
this.validateAndRunAlert(services, apiKey, attributes, references)
),
state,
(stateUpdates: State) => {
return {
...stateUpdates,
Expand All @@ -252,13 +280,32 @@ export class TaskRunner {
};
}
),
runAt: getNextRunAt(
new Date(this.taskInstance.startedAt!),
// we do not currently have a good way of returning the type
// from SavedObjectsClient, and as we currenrtly require a schedule
// and we only support `interval`, we can cast this safely
attributes.schedule as IntervalSchedule
runAt: resolveErr<Date, Error>(runAt, () =>
getNextRunAt(
new Date(),
// if we fail at this point we wish to recover but don't have access to the Alert's
// attributes, so we'll use a default interval to prevent the underlying task from
// falling into a failed state
FALLBACK_RETRY_INTERVAL
)
),
};
}
}

/**
* If an error is thrown, wrap it in an AlertTaskRunResult
* so that we can treat each field independantly
*/
async function errorAsAlertTaskRunResult(
future: Promise<Resultable<AlertTaskRunResult, Error>>
): Promise<Resultable<AlertTaskRunResult, Error>> {
try {
return await future;
} catch (e) {
return {
state: asErr(e),
runAt: asErr(e),
};
}
}

0 comments on commit dee61dc

Please sign in to comment.