Skip to content

Commit

Permalink
[Alerting] Change execution of alerts from async to sync (#97311) (#1…
Browse files Browse the repository at this point in the history
…06298)

* added ability to run ephemeral tasks

* fixed typing

* added typing on plugin

* WIP

* Fix type issues

* Hook up the ephemeral task into the task runner for actions

* Tasks can now run independently of one another

* Use deferred language

* Refactor taskParams slightly

* Use Promise.all

* Remove deferred logic

* Add config options to limit the amount of tasks executing at once

* Add ephemeral task monitoring

* WIP

* Add single test so far

* Ensure we log after actions have executed

* Remove confusing * 1

* Add logic to ensure we fallback to default enqueueing if the total actions is above the config

* Add additional test

* Fix tests a bit, ensure we log the alerting:actions-execute right away and the tests should listen for alerts:execute

* Better tests

* If the queue is at capacity, attempt to execute the ephemeral task as a regular action

* Ensure we run ephemeral tasks before to avoid them getting stuck in the queue

* Do not handle the promise anymore

* Remove unnecessary code

* Properly handle errors from ephemeral task lifecycle

* moved acitons domain out of alerting and into actions plugin

* Remove some tests

* Fix TS and test issues

* Fix type issues

* Fix more type issues

* Fix more type issues

* Fix jest tests

* Fix more jest tests

* Off by default

* Fix jest tests

* Update config for this suite too

* Start of telemetry code

* Fix types and add missing files

* Fix telemetry schema

* Fix types

* Fix more types

* moved load event emission to pollingcycle and added health stats on Ephemeral tasks

* Add more telemetry data based on new health metrics for the ephemeral queue

* Fix tests and types

* Add separate request capacity for ephemeral queue

* Fix telemetry schema and add tests for usage collection

* track polled tasks by persistence and use in capacity estimation instead of executions

* fixed typing

* Bump default capacity

* added delay metric to ephemeral stats

* Fix bad merge

* Fix tests

* Fix tests

* Fix types

* Skip failing tests

* Exclude ephemeral stats from capacity estimation tests

* PR feedback

* More PR feedback

* PR feedback

* Fix merge conflict

* Try fixing CI

* Fix broken lock file from merge

* Match master

* Add this back

* PR feedback

* Change to queue and add test

* Disable ephemeral queue in tests

* Updated desc

* Comment out ephemeral-specific tests tha require the entire test suite to support ephemeral tasks

* Add clarifying comment

Co-authored-by: Gidi Meir Morris <[email protected]>
Co-authored-by: Kibana Machine <[email protected]>

Co-authored-by: Gidi Meir Morris <[email protected]>
Co-authored-by: Kibana Machine <[email protected]>
  • Loading branch information
3 people authored Jul 20, 2021
1 parent c0abdcd commit c18ac12
Show file tree
Hide file tree
Showing 78 changed files with 4,510 additions and 843 deletions.
8 changes: 7 additions & 1 deletion docs/settings/alert-action-settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -207,4 +207,10 @@ Use `full` to perform hostname verification, `certificate` to skip hostname veri
[[alert-settings]]
==== Alerting settings

You do not need to configure any additional settings to use alerting in {kib}.
[cols="2*<"]
|===

| `xpack.alerting.maxEphemeralActionsPerAlert`
| Sets the number of actions that will be executed ephemerally. To use this, enable ephemeral tasks in task manager first with <<task-manager-settings,`xpack.task_manager.ephemeral_tasks.enabled`>>

|===
8 changes: 8 additions & 0 deletions docs/settings/task-manager-settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ Task Manager runs background tasks by polling for work on an interval. You can
`monitored_stats_health_verbose_log.`
`warn_delayed_task_start_in_seconds`
| The amount of seconds we allow a task to delay before printing a warning server log. Defaults to 60.

| `xpack.task_manager.ephemeral_tasks.enabled`
| Enables an experimental feature that executes a limited (and configurable) number of actions in the same task as the alert which triggered them.
These action tasks will reduce the latency of the time it takes an action to run after it's triggered, but are not persisted as SavedObjects.
These non-persisted action tasks have a risk that they won't be run at all if the Kibana instance running them exits unexpectedly. Defaults to false.

| `xpack.task_manager.ephemeral_tasks.request_capacity`
| Sets the size of the ephemeral queue defined above. Defaults to 10.
|===

[float]
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/actions/server/actions_client.mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const createActionsClientMock = () => {
getBulk: jest.fn(),
execute: jest.fn(),
enqueueExecution: jest.fn(),
ephemeralEnqueuedExecution: jest.fn(),
listTypes: jest.fn(),
isActionTypeEnabled: jest.fn(),
};
Expand Down
10 changes: 10 additions & 0 deletions x-pack/plugins/actions/server/actions_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const scopedClusterClient = elasticsearchServiceMock.createScopedClusterClient()
const actionExecutor = actionExecutorMock.create();
const authorization = actionsAuthorizationMock.create();
const executionEnqueuer = jest.fn();
const ephemeralExecutionEnqueuer = jest.fn();
const request = httpServerMock.createKibanaRequest();
const auditLogger = auditServiceMock.create().asScoped(request);

Expand Down Expand Up @@ -77,6 +78,7 @@ beforeEach(() => {
preconfiguredActions: [],
actionExecutor,
executionEnqueuer,
ephemeralExecutionEnqueuer,
request,
authorization: (authorization as unknown) as ActionsAuthorization,
auditLogger,
Expand Down Expand Up @@ -453,6 +455,7 @@ describe('create()', () => {
preconfiguredActions: [],
actionExecutor,
executionEnqueuer,
ephemeralExecutionEnqueuer,
request,
authorization: (authorization as unknown) as ActionsAuthorization,
});
Expand Down Expand Up @@ -553,6 +556,7 @@ describe('get()', () => {
defaultKibanaIndex,
actionExecutor,
executionEnqueuer,
ephemeralExecutionEnqueuer,
request,
authorization: (authorization as unknown) as ActionsAuthorization,
preconfiguredActions: [
Expand Down Expand Up @@ -608,6 +612,7 @@ describe('get()', () => {
defaultKibanaIndex,
actionExecutor,
executionEnqueuer,
ephemeralExecutionEnqueuer,
request,
authorization: (authorization as unknown) as ActionsAuthorization,
preconfiguredActions: [
Expand Down Expand Up @@ -724,6 +729,7 @@ describe('get()', () => {
defaultKibanaIndex,
actionExecutor,
executionEnqueuer,
ephemeralExecutionEnqueuer,
request,
authorization: (authorization as unknown) as ActionsAuthorization,
preconfiguredActions: [
Expand Down Expand Up @@ -793,6 +799,7 @@ describe('getAll()', () => {
defaultKibanaIndex,
actionExecutor,
executionEnqueuer,
ephemeralExecutionEnqueuer,
request,
authorization: (authorization as unknown) as ActionsAuthorization,
preconfiguredActions: [
Expand Down Expand Up @@ -930,6 +937,7 @@ describe('getAll()', () => {
defaultKibanaIndex,
actionExecutor,
executionEnqueuer,
ephemeralExecutionEnqueuer,
request,
authorization: (authorization as unknown) as ActionsAuthorization,
preconfiguredActions: [
Expand Down Expand Up @@ -1005,6 +1013,7 @@ describe('getBulk()', () => {
defaultKibanaIndex,
actionExecutor,
executionEnqueuer,
ephemeralExecutionEnqueuer,
request,
authorization: (authorization as unknown) as ActionsAuthorization,
preconfiguredActions: [
Expand Down Expand Up @@ -1136,6 +1145,7 @@ describe('getBulk()', () => {
defaultKibanaIndex,
actionExecutor,
executionEnqueuer,
ephemeralExecutionEnqueuer,
request,
authorization: (authorization as unknown) as ActionsAuthorization,
preconfiguredActions: [
Expand Down
20 changes: 18 additions & 2 deletions x-pack/plugins/actions/server/actions_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import {
AuthorizationMode,
} from './authorization/get_authorization_mode_by_source';
import { connectorAuditEvent, ConnectorAuditAction } from './lib/audit_events';
import { RunNowResult } from '../../task_manager/server';

// We are assuming there won't be many actions. This is why we will load
// all the actions in advance and assume the total count to not go over 10000.
Expand Down Expand Up @@ -68,7 +69,8 @@ interface ConstructorOptions {
unsecuredSavedObjectsClient: SavedObjectsClientContract;
preconfiguredActions: PreConfiguredAction[];
actionExecutor: ActionExecutorContract;
executionEnqueuer: ExecutionEnqueuer;
executionEnqueuer: ExecutionEnqueuer<void>;
ephemeralExecutionEnqueuer: ExecutionEnqueuer<RunNowResult>;
request: KibanaRequest;
authorization: ActionsAuthorization;
auditLogger?: AuditLogger;
Expand All @@ -88,7 +90,8 @@ export class ActionsClient {
private readonly actionExecutor: ActionExecutorContract;
private readonly request: KibanaRequest;
private readonly authorization: ActionsAuthorization;
private readonly executionEnqueuer: ExecutionEnqueuer;
private readonly executionEnqueuer: ExecutionEnqueuer<void>;
private readonly ephemeralExecutionEnqueuer: ExecutionEnqueuer<RunNowResult>;
private readonly auditLogger?: AuditLogger;

constructor({
Expand All @@ -99,6 +102,7 @@ export class ActionsClient {
preconfiguredActions,
actionExecutor,
executionEnqueuer,
ephemeralExecutionEnqueuer,
request,
authorization,
auditLogger,
Expand All @@ -110,6 +114,7 @@ export class ActionsClient {
this.preconfiguredActions = preconfiguredActions;
this.actionExecutor = actionExecutor;
this.executionEnqueuer = executionEnqueuer;
this.ephemeralExecutionEnqueuer = ephemeralExecutionEnqueuer;
this.request = request;
this.authorization = authorization;
this.auditLogger = auditLogger;
Expand Down Expand Up @@ -497,6 +502,17 @@ export class ActionsClient {
return this.executionEnqueuer(this.unsecuredSavedObjectsClient, options);
}

public async ephemeralEnqueuedExecution(options: EnqueueExecutionOptions): Promise<RunNowResult> {
const { source } = options;
if (
(await getAuthorizationModeBySource(this.unsecuredSavedObjectsClient, source)) ===
AuthorizationMode.RBAC
) {
await this.authorization.ensureAuthorized('execute');
}
return this.ephemeralExecutionEnqueuer(this.unsecuredSavedObjectsClient, options);
}

public async listTypes(): Promise<ActionType[]> {
return this.actionTypeRegistry.list();
}
Expand Down
78 changes: 61 additions & 17 deletions x-pack/plugins/actions/server/create_execute_function.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@
*/

import { SavedObjectsClientContract } from '../../../../src/core/server';
import { TaskManagerStartContract } from '../../task_manager/server';
import { RawAction, ActionTypeRegistryContract, PreConfiguredAction } from './types';
import { RunNowResult, TaskManagerStartContract } from '../../task_manager/server';
import {
RawAction,
ActionTypeRegistryContract,
PreConfiguredAction,
ActionTaskExecutorParams,
} from './types';
import { ACTION_TASK_PARAMS_SAVED_OBJECT_TYPE } from './constants/saved_objects';
import { ExecuteOptions as ActionExecutorOptions } from './lib/action_executor';
import { isSavedObjectExecutionSource } from './lib';
Expand All @@ -27,17 +32,17 @@ export interface ExecuteOptions extends Pick<ActionExecutorOptions, 'params' | '
relatedSavedObjects?: RelatedSavedObjects;
}

export type ExecutionEnqueuer = (
export type ExecutionEnqueuer<T> = (
unsecuredSavedObjectsClient: SavedObjectsClientContract,
options: ExecuteOptions
) => Promise<void>;
) => Promise<T>;

export function createExecutionEnqueuerFunction({
taskManager,
actionTypeRegistry,
isESOCanEncrypt,
preconfiguredActions,
}: CreateExecuteFunctionOptions) {
}: CreateExecuteFunctionOptions): ExecutionEnqueuer<void> {
return async function execute(
unsecuredSavedObjectsClient: SavedObjectsClientContract,
{ id, params, spaceId, source, apiKey, relatedSavedObjects }: ExecuteOptions
Expand All @@ -48,18 +53,10 @@ export function createExecutionEnqueuerFunction({
);
}

const { actionTypeId, name, isMissingSecrets } = await getAction(
unsecuredSavedObjectsClient,
preconfiguredActions,
id
);

if (isMissingSecrets) {
throw new Error(
`Unable to execute action because no secrets are defined for the "${name}" connector.`
);
}
const action = await getAction(unsecuredSavedObjectsClient, preconfiguredActions, id);
validateCanActionBeUsed(action);

const { actionTypeId } = action;
if (!actionTypeRegistry.isActionExecutable(id, actionTypeId, { notifyUsage: true })) {
actionTypeRegistry.ensureActionTypeEnabled(actionTypeId);
}
Expand All @@ -76,7 +73,7 @@ export function createExecutionEnqueuerFunction({
);

await taskManager.schedule({
taskType: `actions:${actionTypeId}`,
taskType: `actions:${action.actionTypeId}`,
params: {
spaceId,
actionTaskParamsId: actionTaskParamsRecord.id,
Expand All @@ -87,6 +84,53 @@ export function createExecutionEnqueuerFunction({
};
}

export function createEphemeralExecutionEnqueuerFunction({
taskManager,
actionTypeRegistry,
preconfiguredActions,
}: CreateExecuteFunctionOptions): ExecutionEnqueuer<RunNowResult> {
return async function execute(
unsecuredSavedObjectsClient: SavedObjectsClientContract,
{ id, params, spaceId, source, apiKey }: ExecuteOptions
): Promise<RunNowResult> {
const action = await getAction(unsecuredSavedObjectsClient, preconfiguredActions, id);
validateCanActionBeUsed(action);

const { actionTypeId } = action;
if (!actionTypeRegistry.isActionExecutable(id, actionTypeId, { notifyUsage: true })) {
actionTypeRegistry.ensureActionTypeEnabled(actionTypeId);
}

const taskParams: ActionTaskExecutorParams = {
spaceId,
taskParams: {
actionId: id,
// Saved Objects won't allow us to enforce unknown rather than any
// eslint-disable-next-line @typescript-eslint/no-explicit-any
params: params as Record<string, any>,
...(apiKey ? { apiKey } : {}),
},
...executionSourceAsSavedObjectReferences(source),
};

return taskManager.ephemeralRunNow({
taskType: `actions:${action.actionTypeId}`,
params: taskParams,
state: {},
scope: ['actions'],
});
};
}

function validateCanActionBeUsed(action: PreConfiguredAction | RawAction) {
const { name, isMissingSecrets } = action;
if (isMissingSecrets) {
throw new Error(
`Unable to execute action because no secrets are defined for the "${name}" connector.`
);
}
}

function executionSourceAsSavedObjectReferences(executionSource: ActionExecutorOptions['source']) {
return isSavedObjectExecutionSource(executionSource)
? {
Expand Down
3 changes: 3 additions & 0 deletions x-pack/plugins/actions/server/lib/action_executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export interface TaskInfo {

export interface ExecuteOptions<Source = unknown> {
actionId: string;
isEphemeral?: boolean;
request: KibanaRequest;
params: Record<string, unknown>;
source?: ActionExecutionSource<Source>;
Expand Down Expand Up @@ -79,6 +80,7 @@ export class ActionExecutor {
params,
request,
source,
isEphemeral,
taskInfo,
relatedSavedObjects,
}: ExecuteOptions): Promise<ActionTypeExecutorResult<unknown>> {
Expand Down Expand Up @@ -207,6 +209,7 @@ export class ActionExecutor {
params: validatedParams,
config: validatedConfig,
secrets: validatedSecrets,
isEphemeral,
});
} catch (err) {
rawResult = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ test('executes the task by calling the executor with proper parameters', async (

expect(mockedActionExecutor.execute).toHaveBeenCalledWith({
actionId: '2',
isEphemeral: false,
params: { baz: true },
relatedSavedObjects: [],
request: expect.objectContaining({
Expand Down Expand Up @@ -250,6 +251,7 @@ test('uses API key when provided', async () => {

expect(mockedActionExecutor.execute).toHaveBeenCalledWith({
actionId: '2',
isEphemeral: false,
params: { baz: true },
relatedSavedObjects: [],
request: expect.objectContaining({
Expand Down Expand Up @@ -293,6 +295,7 @@ test('uses relatedSavedObjects when provided', async () => {

expect(mockedActionExecutor.execute).toHaveBeenCalledWith({
actionId: '2',
isEphemeral: false,
params: { baz: true },
relatedSavedObjects: [
{
Expand Down Expand Up @@ -334,14 +337,15 @@ test('sanitizes invalid relatedSavedObjects when provided', async () => {
await taskRunner.run();
expect(mockedActionExecutor.execute).toHaveBeenCalledWith({
actionId: '2',
isEphemeral: false,
params: { baz: true },
relatedSavedObjects: [],
request: expect.objectContaining({
headers: {
// base64 encoded "123:abc"
authorization: 'ApiKey MTIzOmFiYw==',
},
}),
relatedSavedObjects: [],
taskInfo: {
scheduled: new Date(),
},
Expand Down Expand Up @@ -369,6 +373,7 @@ test(`doesn't use API key when not provided`, async () => {

expect(mockedActionExecutor.execute).toHaveBeenCalledWith({
actionId: '2',
isEphemeral: false,
params: { baz: true },
relatedSavedObjects: [],
request: expect.objectContaining({
Expand Down
Loading

0 comments on commit c18ac12

Please sign in to comment.