diff --git a/docs/settings/alert-action-settings.asciidoc b/docs/settings/alert-action-settings.asciidoc index 599e8c54643ce..38a73ec92313f 100644 --- a/docs/settings/alert-action-settings.asciidoc +++ b/docs/settings/alert-action-settings.asciidoc @@ -191,3 +191,6 @@ Specifies the default timeout for the all rule types tasks. The time is formatte `[ms,s,m,h,d,w,M,Y]` + For example, `20m`, `24h`, `7d`, `1w`. Default: `60s`. + +`xpack.alerting.cancelAlertsOnRuleTimeout`:: +Specifies whether to skip writing alerts and scheduling actions if rule execution is cancelled due to timeout. Default: `true`. This setting can be overridden by individual rule types. \ No newline at end of file diff --git a/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker b/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker index a9a54bf6794b2..8ce8e2cb40700 100755 --- a/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker +++ b/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker @@ -199,6 +199,7 @@ kibana_vars=( xpack.alerting.invalidateApiKeysTask.interval xpack.alerting.invalidateApiKeysTask.removalDelay xpack.alerting.defaultRuleTaskTimeout + xpack.alerting.cancelAlertsOnRuleTimeout xpack.alerts.healthCheck.interval xpack.alerts.invalidateApiKeysTask.interval xpack.alerts.invalidateApiKeysTask.removalDelay diff --git a/x-pack/plugins/alerting/README.md b/x-pack/plugins/alerting/README.md index 9c4f27fa945be..3646dbddb347d 100644 --- a/x-pack/plugins/alerting/README.md +++ b/x-pack/plugins/alerting/README.md @@ -96,6 +96,7 @@ The following table describes the properties of the `options` object. |producer|The id of the application producing this rule type.|string| |minimumLicenseRequired|The value of a minimum license. Most of the rules are licensed as "basic".|string| |ruleTaskTimeout|The length of time a rule can run before being cancelled due to timeout. By default, this value is "5m".|string| +|cancelAlertsOnRuleTimeout|Whether to skip writing alerts and scheduling actions if a rule execution is cancelled due to timeout. By default, this value is set to "true".|boolean| |useSavedObjectReferences.extractReferences|(Optional) When developing a rule type, you can choose to implement hooks for extracting saved object references from rule parameters. This hook will be invoked when a rule is created or updated. Implementing this hook is optional, but if an extract hook is implemented, an inject hook must also be implemented.|Function |useSavedObjectReferences.injectReferences|(Optional) When developing a rule type, you can choose to implement hooks for injecting saved object references into rule parameters. This hook will be invoked when a rule is retrieved (get or find). Implementing this hook is optional, but if an inject hook is implemented, an extract hook must also be implemented.|Function |isExportable|Whether the rule type is exportable from the Saved Objects Management UI.|boolean| diff --git a/x-pack/plugins/alerting/common/alert.ts b/x-pack/plugins/alerting/common/alert.ts index bf0c8e382c9d4..4431f185ac9ca 100644 --- a/x-pack/plugins/alerting/common/alert.ts +++ b/x-pack/plugins/alerting/common/alert.ts @@ -30,6 +30,7 @@ export enum AlertExecutionStatusErrorReasons { Execute = 'execute', Unknown = 'unknown', License = 'license', + Timeout = 'timeout', } export interface AlertExecutionStatus { diff --git a/x-pack/plugins/alerting/server/config.test.ts b/x-pack/plugins/alerting/server/config.test.ts index 63d93b9d67769..a96612beac412 100644 --- a/x-pack/plugins/alerting/server/config.test.ts +++ b/x-pack/plugins/alerting/server/config.test.ts @@ -12,6 +12,7 @@ describe('config validation', () => { const config: Record = {}; expect(configSchema.validate(config)).toMatchInlineSnapshot(` Object { + "cancelAlertsOnRuleTimeout": true, "defaultRuleTaskTimeout": "5m", "healthCheck": Object { "interval": "60m", diff --git a/x-pack/plugins/alerting/server/config.ts b/x-pack/plugins/alerting/server/config.ts index 277f0c7297df9..8b1b664534379 100644 --- a/x-pack/plugins/alerting/server/config.ts +++ b/x-pack/plugins/alerting/server/config.ts @@ -21,6 +21,7 @@ export const configSchema = schema.object({ defaultValue: DEFAULT_MAX_EPHEMERAL_ACTIONS_PER_ALERT, }), defaultRuleTaskTimeout: schema.string({ validate: validateDurationSchema, defaultValue: '5m' }), + cancelAlertsOnRuleTimeout: schema.boolean({ defaultValue: true }), }); export type AlertsConfig = TypeOf; diff --git a/x-pack/plugins/alerting/server/plugin.test.ts b/x-pack/plugins/alerting/server/plugin.test.ts index 6419a3ccc5c90..a8da891a3dd14 100644 --- a/x-pack/plugins/alerting/server/plugin.test.ts +++ b/x-pack/plugins/alerting/server/plugin.test.ts @@ -39,6 +39,7 @@ describe('Alerting Plugin', () => { }, maxEphemeralActionsPerAlert: 10, defaultRuleTaskTimeout: '5m', + cancelAlertsOnRuleTimeout: true, }); plugin = new AlertingPlugin(context); @@ -73,6 +74,7 @@ describe('Alerting Plugin', () => { }, maxEphemeralActionsPerAlert: 10, defaultRuleTaskTimeout: '5m', + cancelAlertsOnRuleTimeout: true, }); plugin = new AlertingPlugin(context); @@ -145,7 +147,7 @@ describe('Alerting Plugin', () => { }); }); - it('should apply default config value for ruleTaskTimeout', async () => { + it('should apply default config value for ruleTaskTimeout if no value is specified', async () => { const ruleType = { ...sampleAlertType, minimumLicenseRequired: 'basic', @@ -153,6 +155,35 @@ describe('Alerting Plugin', () => { await setup.registerType(ruleType); expect(ruleType.ruleTaskTimeout).toBe('5m'); }); + + it('should apply value for ruleTaskTimeout if specified', async () => { + const ruleType = { + ...sampleAlertType, + minimumLicenseRequired: 'basic', + ruleTaskTimeout: '20h', + } as AlertType; + await setup.registerType(ruleType); + expect(ruleType.ruleTaskTimeout).toBe('20h'); + }); + + it('should apply default config value for cancelAlertsOnRuleTimeout if no value is specified', async () => { + const ruleType = { + ...sampleAlertType, + minimumLicenseRequired: 'basic', + } as AlertType; + await setup.registerType(ruleType); + expect(ruleType.cancelAlertsOnRuleTimeout).toBe(true); + }); + + it('should apply value for cancelAlertsOnRuleTimeout if specified', async () => { + const ruleType = { + ...sampleAlertType, + minimumLicenseRequired: 'basic', + cancelAlertsOnRuleTimeout: false, + } as AlertType; + await setup.registerType(ruleType); + expect(ruleType.cancelAlertsOnRuleTimeout).toBe(false); + }); }); }); @@ -169,6 +200,7 @@ describe('Alerting Plugin', () => { }, maxEphemeralActionsPerAlert: 10, defaultRuleTaskTimeout: '5m', + cancelAlertsOnRuleTimeout: true, }); const plugin = new AlertingPlugin(context); @@ -210,6 +242,7 @@ describe('Alerting Plugin', () => { }, maxEphemeralActionsPerAlert: 10, defaultRuleTaskTimeout: '5m', + cancelAlertsOnRuleTimeout: true, }); const plugin = new AlertingPlugin(context); @@ -265,6 +298,7 @@ describe('Alerting Plugin', () => { }, maxEphemeralActionsPerAlert: 100, defaultRuleTaskTimeout: '5m', + cancelAlertsOnRuleTimeout: true, }); const plugin = new AlertingPlugin(context); diff --git a/x-pack/plugins/alerting/server/plugin.ts b/x-pack/plugins/alerting/server/plugin.ts index bd3eab19d220d..8be96170e664a 100644 --- a/x-pack/plugins/alerting/server/plugin.ts +++ b/x-pack/plugins/alerting/server/plugin.ts @@ -75,6 +75,7 @@ export const EVENT_LOG_ACTIONS = { newInstance: 'new-instance', recoveredInstance: 'recovered-instance', activeInstance: 'active-instance', + executeTimeout: 'execute-timeout', }; export const LEGACY_EVENT_LOG_ACTIONS = { resolvedInstance: 'resolved-instance', @@ -285,14 +286,13 @@ export class AlertingPlugin { if (!(alertType.minimumLicenseRequired in LICENSE_TYPE)) { throw new Error(`"${alertType.minimumLicenseRequired}" is not a valid license type`); } - if (!alertType.ruleTaskTimeout) { - alertingConfig.then((config) => { - alertType.ruleTaskTimeout = config.defaultRuleTaskTimeout; - ruleTypeRegistry.register(alertType); - }); - } else { + + alertingConfig.then((config) => { + alertType.ruleTaskTimeout = alertType.ruleTaskTimeout ?? config.defaultRuleTaskTimeout; + alertType.cancelAlertsOnRuleTimeout = + alertType.cancelAlertsOnRuleTimeout ?? config.cancelAlertsOnRuleTimeout; ruleTypeRegistry.register(alertType); - } + }); }, getSecurityHealth: async () => { return await getSecurityHealth( @@ -375,21 +375,24 @@ export class AlertingPlugin { return alertingAuthorizationClientFactory!.create(request); }; - taskRunnerFactory.initialize({ - logger, - getServices: this.getServicesFactory(core.savedObjects, core.elasticsearch), - getRulesClientWithRequest, - spaceIdToNamespace, - actionsPlugin: plugins.actions, - encryptedSavedObjectsClient, - basePathService: core.http.basePath, - eventLogger: this.eventLogger!, - internalSavedObjectsRepository: core.savedObjects.createInternalRepository(['alert']), - executionContext: core.executionContext, - ruleTypeRegistry: this.ruleTypeRegistry!, - kibanaBaseUrl: this.kibanaBaseUrl, - supportsEphemeralTasks: plugins.taskManager.supportsEphemeralTasks(), - maxEphemeralActionsPerAlert: this.config.then((config) => config.maxEphemeralActionsPerAlert), + this.config.then((config) => { + taskRunnerFactory.initialize({ + logger, + getServices: this.getServicesFactory(core.savedObjects, core.elasticsearch), + getRulesClientWithRequest, + spaceIdToNamespace, + actionsPlugin: plugins.actions, + encryptedSavedObjectsClient, + basePathService: core.http.basePath, + eventLogger: this.eventLogger!, + internalSavedObjectsRepository: core.savedObjects.createInternalRepository(['alert']), + executionContext: core.executionContext, + ruleTypeRegistry: this.ruleTypeRegistry!, + kibanaBaseUrl: this.kibanaBaseUrl, + supportsEphemeralTasks: plugins.taskManager.supportsEphemeralTasks(), + maxEphemeralActionsPerAlert: config.maxEphemeralActionsPerAlert, + cancelAlertsOnRuleTimeout: config.cancelAlertsOnRuleTimeout, + }); }); this.eventLogService!.registerSavedObjectProvider('alert', (request) => { diff --git a/x-pack/plugins/alerting/server/task_runner/create_execution_handler.test.ts b/x-pack/plugins/alerting/server/task_runner/create_execution_handler.test.ts index 244dcb85b13e9..fc5c5cf8897f0 100644 --- a/x-pack/plugins/alerting/server/task_runner/create_execution_handler.test.ts +++ b/x-pack/plugins/alerting/server/task_runner/create_execution_handler.test.ts @@ -99,7 +99,7 @@ const createExecutionHandlerParams: jest.Mocked< stateVal: 'My other {{state.value}} goes here', }, supportsEphemeralTasks: false, - maxEphemeralActionsPerAlert: Promise.resolve(10), + maxEphemeralActionsPerAlert: 10, }; beforeEach(() => { diff --git a/x-pack/plugins/alerting/server/task_runner/create_execution_handler.ts b/x-pack/plugins/alerting/server/task_runner/create_execution_handler.ts index 652e032a1cbb0..d93d8cd6d1312 100644 --- a/x-pack/plugins/alerting/server/task_runner/create_execution_handler.ts +++ b/x-pack/plugins/alerting/server/task_runner/create_execution_handler.ts @@ -56,7 +56,7 @@ export interface CreateExecutionHandlerOptions< request: KibanaRequest; alertParams: AlertTypeParams; supportsEphemeralTasks: boolean; - maxEphemeralActionsPerAlert: Promise; + maxEphemeralActionsPerAlert: number; } interface ExecutionHandlerOptions { @@ -157,7 +157,7 @@ export function createExecutionHandler< const alertLabel = `${alertType.id}:${alertId}: '${alertName}'`; const actionsClient = await actionsPlugin.getActionsClientWithRequest(request); - let ephemeralActionsToSchedule = await maxEphemeralActionsPerAlert; + let ephemeralActionsToSchedule = maxEphemeralActionsPerAlert; for (const action of actions) { if ( !actionsPlugin.isActionExecutable(action.id, action.actionTypeId, { notifyUsage: true }) diff --git a/x-pack/plugins/alerting/server/task_runner/task_runner.test.ts b/x-pack/plugins/alerting/server/task_runner/task_runner.test.ts index 07c4d0371c718..f70cbaa13f7d1 100644 --- a/x-pack/plugins/alerting/server/task_runner/task_runner.test.ts +++ b/x-pack/plugins/alerting/server/task_runner/task_runner.test.ts @@ -107,7 +107,8 @@ describe('Task Runner', () => { ruleTypeRegistry, kibanaBaseUrl: 'https://localhost:5601', supportsEphemeralTasks: false, - maxEphemeralActionsPerAlert: new Promise((resolve) => resolve(10)), + maxEphemeralActionsPerAlert: 10, + cancelAlertsOnRuleTimeout: true, }; function testAgainstEphemeralSupport( @@ -285,7 +286,7 @@ describe('Task Runner', () => { expect(call.services).toBeTruthy(); const logger = taskRunnerFactoryInitializerParams.logger; - expect(logger.debug).toHaveBeenCalledTimes(2); + expect(logger.debug).toHaveBeenCalledTimes(3); expect(logger.debug).nthCalledWith(1, 'executing alert test:1 at 1970-01-01T00:00:00.000Z'); expect(logger.debug).nthCalledWith( 2, @@ -432,7 +433,7 @@ describe('Task Runner', () => { `); const logger = customTaskRunnerFactoryInitializerParams.logger; - expect(logger.debug).toHaveBeenCalledTimes(3); + expect(logger.debug).toHaveBeenCalledTimes(4); expect(logger.debug).nthCalledWith(1, 'executing alert test:1 at 1970-01-01T00:00:00.000Z'); expect(logger.debug).nthCalledWith( 2, @@ -648,7 +649,7 @@ describe('Task Runner', () => { expect(actionsClient.ephemeralEnqueuedExecution).toHaveBeenCalledTimes(0); const logger = taskRunnerFactoryInitializerParams.logger; - expect(logger.debug).toHaveBeenCalledTimes(4); + expect(logger.debug).toHaveBeenCalledTimes(5); expect(logger.debug).nthCalledWith(1, 'executing alert test:1 at 1970-01-01T00:00:00.000Z'); expect(logger.debug).nthCalledWith( 2, @@ -848,7 +849,7 @@ describe('Task Runner', () => { expect(enqueueFunction).toHaveBeenCalledTimes(1); const logger = customTaskRunnerFactoryInitializerParams.logger; - expect(logger.debug).toHaveBeenCalledTimes(4); + expect(logger.debug).toHaveBeenCalledTimes(5); expect(logger.debug).nthCalledWith(1, 'executing alert test:1 at 1970-01-01T00:00:00.000Z'); expect(logger.debug).nthCalledWith( 2, @@ -1537,7 +1538,7 @@ describe('Task Runner', () => { `); const logger = customTaskRunnerFactoryInitializerParams.logger; - expect(logger.debug).toHaveBeenCalledTimes(4); + expect(logger.debug).toHaveBeenCalledTimes(5); expect(logger.debug).nthCalledWith(1, 'executing alert test:1 at 1970-01-01T00:00:00.000Z'); expect(logger.debug).nthCalledWith( 2, @@ -4339,7 +4340,7 @@ describe('Task Runner', () => { expect(call.services).toBeTruthy(); const logger = taskRunnerFactoryInitializerParams.logger; - expect(logger.debug).toHaveBeenCalledTimes(2); + expect(logger.debug).toHaveBeenCalledTimes(3); expect(logger.debug).nthCalledWith(1, 'executing alert test:1 at 1970-01-01T00:00:00.000Z'); expect(logger.debug).nthCalledWith( 2, diff --git a/x-pack/plugins/alerting/server/task_runner/task_runner.ts b/x-pack/plugins/alerting/server/task_runner/task_runner.ts index 8b93d3fa17211..f651f41ef0c1e 100644 --- a/x-pack/plugins/alerting/server/task_runner/task_runner.ts +++ b/x-pack/plugins/alerting/server/task_runner/task_runner.ts @@ -82,6 +82,7 @@ export class TaskRunner< private context: TaskRunnerContext; private logger: Logger; private taskInstance: AlertTaskInstance; + private ruleName: string | null; private alertType: NormalizedAlertType< Params, ExtractedParams, @@ -92,6 +93,7 @@ export class TaskRunner< RecoveryActionGroupId >; private readonly ruleTypeRegistry: RuleTypeRegistry; + private cancelled: boolean; constructor( alertType: NormalizedAlertType< @@ -109,8 +111,10 @@ export class TaskRunner< this.context = context; this.logger = context.logger; this.alertType = alertType; + this.ruleName = null; this.taskInstance = taskInstanceToAlertTaskInstance(taskInstance); this.ruleTypeRegistry = context.ruleTypeRegistry; + this.cancelled = false; } async getApiKeyForAlertPermissions(alertId: string, spaceId: string) { @@ -201,6 +205,39 @@ export class TaskRunner< }); } + private async updateRuleExecutionStatus( + alertId: string, + namespace: string | undefined, + executionStatus: AlertExecutionStatus + ) { + const client = this.context.internalSavedObjectsRepository; + const attributes = { + executionStatus: alertExecutionStatusToRaw(executionStatus), + }; + + try { + await partiallyUpdateAlert(client, alertId, attributes, { + ignore404: true, + namespace, + refresh: false, + }); + } catch (err) { + this.logger.error( + `error updating rule execution status for ${this.alertType.id}:${alertId} ${err.message}` + ); + } + } + + private shouldLogAndScheduleActionsForAlerts() { + // if execution hasn't been cancelled, return true + if (!this.cancelled) { + return true; + } + + // if execution has been cancelled, return true if EITHER alerting config or rule type indicate to proceed with scheduling actions + return !this.context.cancelAlertsOnRuleTimeout || !this.alertType.cancelAlertsOnRuleTimeout; + } + async executeAlertInstance( alertInstanceId: string, alertInstance: AlertInstance, @@ -355,19 +392,21 @@ export class TaskRunner< recoveredAlerts: recoveredAlertInstances, }); - generateNewAndRecoveredInstanceEvents({ - eventLogger, - originalAlertInstances, - currentAlertInstances: instancesWithScheduledActions, - recoveredAlertInstances, - alertId, - alertLabel, - namespace, - ruleType: alertType, - rule: alert, - }); + if (this.shouldLogAndScheduleActionsForAlerts()) { + generateNewAndRecoveredInstanceEvents({ + eventLogger, + originalAlertInstances, + currentAlertInstances: instancesWithScheduledActions, + recoveredAlertInstances, + alertId, + alertLabel, + namespace, + ruleType: alertType, + rule: alert, + }); + } - if (!muteAll) { + if (!muteAll && this.shouldLogAndScheduleActionsForAlerts()) { const mutedInstanceIdsSet = new Set(mutedInstanceIds); scheduleActionsForRecoveredInstances({ @@ -422,7 +461,14 @@ export class TaskRunner< ) ); } else { - this.logger.debug(`no scheduling of actions for alert ${alertLabel}: alert is muted.`); + if (muteAll) { + this.logger.debug(`no scheduling of actions for alert ${alertLabel}: alert is muted.`); + } + if (!this.shouldLogAndScheduleActionsForAlerts()) { + this.logger.debug( + `no scheduling of actions for alert ${alertLabel}: alert execution has been cancelled.` + ); + } } return { @@ -487,6 +533,8 @@ export class TaskRunner< throw new ErrorWithReason(AlertExecutionStatusErrorReasons.Read, err); } + this.ruleName = alert.name; + try { this.ruleTypeRegistry.ensureRuleTypeEnabled(alert.alertTypeId); } catch (err) { @@ -596,21 +644,13 @@ export class TaskRunner< eventLogger.logEvent(event); - const client = this.context.internalSavedObjectsRepository; - const attributes = { - executionStatus: alertExecutionStatusToRaw(executionStatus), - }; - - try { - await partiallyUpdateAlert(client, alertId, attributes, { - ignore404: true, - namespace, - refresh: false, - }); - } catch (err) { - this.logger.error( - `error updating alert execution status for ${this.alertType.id}:${alertId} ${err.message}` + if (!this.cancelled) { + this.logger.debug( + `Updating rule task for ${this.alertType.id} rule with id ${alertId} - ${JSON.stringify( + executionStatus + )}` ); + await this.updateRuleExecutionStatus(alertId, namespace, executionStatus); } return { @@ -646,6 +686,72 @@ export class TaskRunner< }), }; } + + async cancel(): Promise { + if (this.cancelled) { + return; + } + + this.cancelled = true; + + // Write event log entry + const { + params: { alertId, spaceId }, + } = this.taskInstance; + const namespace = this.context.spaceIdToNamespace(spaceId); + + this.logger.debug( + `Cancelling rule type ${this.alertType.id} with id ${alertId} - execution exceeded rule type timeout of ${this.alertType.ruleTaskTimeout}` + ); + + const eventLogger = this.context.eventLogger; + const event: IEvent = { + '@timestamp': new Date().toISOString(), + event: { + action: EVENT_LOG_ACTIONS.executeTimeout, + kind: 'alert', + category: [this.alertType.producer], + }, + message: `rule: ${this.alertType.id}:${alertId}: '${ + this.ruleName ?? '' + }' execution cancelled due to timeout - exceeded rule type timeout of ${ + this.alertType.ruleTaskTimeout + }`, + kibana: { + saved_objects: [ + { + rel: SAVED_OBJECT_REL_PRIMARY, + type: 'alert', + id: alertId, + type_id: this.alertType.id, + namespace, + }, + ], + }, + rule: { + id: alertId, + license: this.alertType.minimumLicenseRequired, + category: this.alertType.id, + ruleset: this.alertType.producer, + ...(this.ruleName ? { name: this.ruleName } : {}), + }, + }; + eventLogger.logEvent(event); + + // Update the rule saved object with execution status + const executionStatus: AlertExecutionStatus = { + lastExecutionDate: new Date(), + status: 'error', + error: { + reason: AlertExecutionStatusErrorReasons.Timeout, + message: `${this.alertType.id}:${alertId}: execution cancelled due to timeout - exceeded rule type timeout of ${this.alertType.ruleTaskTimeout}`, + }, + }; + this.logger.debug( + `Updating rule task for ${this.alertType.id} rule with id ${alertId} - execution error due to timeout` + ); + await this.updateRuleExecutionStatus(alertId, namespace, executionStatus); + } } interface TrackAlertDurationsParams< diff --git a/x-pack/plugins/alerting/server/task_runner/task_runner_cancel.test.ts b/x-pack/plugins/alerting/server/task_runner/task_runner_cancel.test.ts new file mode 100644 index 0000000000000..95cb356af3c1a --- /dev/null +++ b/x-pack/plugins/alerting/server/task_runner/task_runner_cancel.test.ts @@ -0,0 +1,721 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import sinon from 'sinon'; +import { + AlertExecutorOptions, + AlertTypeParams, + AlertTypeState, + AlertInstanceState, + AlertInstanceContext, +} from '../types'; +import { ConcreteTaskInstance, TaskStatus } from '../../../task_manager/server'; +import { TaskRunnerContext } from './task_runner_factory'; +import { TaskRunner } from './task_runner'; +import { encryptedSavedObjectsMock } from '../../../encrypted_saved_objects/server/mocks'; +import { + loggingSystemMock, + savedObjectsRepositoryMock, + httpServiceMock, + executionContextServiceMock, +} from '../../../../../src/core/server/mocks'; +import { PluginStartContract as ActionsPluginStart } from '../../../actions/server'; +import { actionsMock, actionsClientMock } from '../../../actions/server/mocks'; +import { alertsMock, rulesClientMock } from '../mocks'; +import { eventLoggerMock } from '../../../event_log/server/event_logger.mock'; +import { IEventLogger } from '../../../event_log/server'; +import { Alert, RecoveredActionGroup } from '../../common'; +import { UntypedNormalizedAlertType } from '../rule_type_registry'; +import { ruleTypeRegistryMock } from '../rule_type_registry.mock'; + +const ruleType: jest.Mocked = { + id: 'test', + name: 'My test rule', + actionGroups: [{ id: 'default', name: 'Default' }, RecoveredActionGroup], + defaultActionGroupId: 'default', + minimumLicenseRequired: 'basic', + isExportable: true, + recoveryActionGroup: RecoveredActionGroup, + executor: jest.fn(), + producer: 'alerts', + cancelAlertsOnRuleTimeout: true, + ruleTaskTimeout: '5m', +}; + +let fakeTimer: sinon.SinonFakeTimers; + +describe('Task Runner Cancel', () => { + let mockedTaskInstance: ConcreteTaskInstance; + + beforeAll(() => { + fakeTimer = sinon.useFakeTimers(); + mockedTaskInstance = { + id: '', + attempts: 0, + status: TaskStatus.Running, + version: '123', + runAt: new Date(), + schedule: { interval: '10s' }, + scheduledAt: new Date(), + startedAt: new Date(), + retryAt: new Date(Date.now() + 5 * 60 * 1000), + state: {}, + taskType: 'alerting:test', + params: { + alertId: '1', + }, + ownerId: null, + }; + }); + + afterAll(() => fakeTimer.restore()); + + const encryptedSavedObjectsClient = encryptedSavedObjectsMock.createClient(); + const services = alertsMock.createAlertServices(); + const actionsClient = actionsClientMock.create(); + const rulesClient = rulesClientMock.create(); + const ruleTypeRegistry = ruleTypeRegistryMock.create(); + + type TaskRunnerFactoryInitializerParamsType = jest.Mocked & { + actionsPlugin: jest.Mocked; + eventLogger: jest.Mocked; + executionContext: ReturnType; + }; + + const taskRunnerFactoryInitializerParams: TaskRunnerFactoryInitializerParamsType = { + getServices: jest.fn().mockReturnValue(services), + actionsPlugin: actionsMock.createStart(), + getRulesClientWithRequest: jest.fn().mockReturnValue(rulesClient), + encryptedSavedObjectsClient, + logger: loggingSystemMock.create().get(), + executionContext: executionContextServiceMock.createInternalStartContract(), + spaceIdToNamespace: jest.fn().mockReturnValue(undefined), + basePathService: httpServiceMock.createBasePath(), + eventLogger: eventLoggerMock.create(), + internalSavedObjectsRepository: savedObjectsRepositoryMock.create(), + ruleTypeRegistry, + kibanaBaseUrl: 'https://localhost:5601', + supportsEphemeralTasks: false, + maxEphemeralActionsPerAlert: 10, + cancelAlertsOnRuleTimeout: true, + }; + + const mockDate = new Date('2019-02-12T21:01:22.479Z'); + + const mockedRuleSavedObject: Alert = { + id: '1', + consumer: 'bar', + createdAt: mockDate, + updatedAt: mockDate, + throttle: null, + muteAll: false, + notifyWhen: 'onActiveAlert', + enabled: true, + alertTypeId: ruleType.id, + apiKey: '', + apiKeyOwner: 'elastic', + schedule: { interval: '10s' }, + name: 'rule-name', + tags: ['rule-', '-tags'], + createdBy: 'rule-creator', + updatedBy: 'rule-updater', + mutedInstanceIds: [], + params: { + bar: true, + }, + actions: [ + { + group: 'default', + id: '1', + actionTypeId: 'action', + params: { + foo: true, + }, + }, + { + group: RecoveredActionGroup.id, + id: '2', + actionTypeId: 'action', + params: { + isResolved: true, + }, + }, + ], + executionStatus: { + status: 'unknown', + lastExecutionDate: new Date('2020-08-20T19:23:38Z'), + }, + }; + + beforeEach(() => { + jest.resetAllMocks(); + taskRunnerFactoryInitializerParams.getServices.mockReturnValue(services); + taskRunnerFactoryInitializerParams.getRulesClientWithRequest.mockReturnValue(rulesClient); + taskRunnerFactoryInitializerParams.actionsPlugin.getActionsClientWithRequest.mockResolvedValue( + actionsClient + ); + taskRunnerFactoryInitializerParams.actionsPlugin.renderActionParameterTemplates.mockImplementation( + (actionTypeId, actionId, params) => params + ); + ruleTypeRegistry.get.mockReturnValue(ruleType); + taskRunnerFactoryInitializerParams.executionContext.withContext.mockImplementation((ctx, fn) => + fn() + ); + rulesClient.get.mockResolvedValue(mockedRuleSavedObject); + encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({ + id: '1', + type: 'alert', + attributes: { + apiKey: Buffer.from('123:abc').toString('base64'), + }, + references: [], + }); + taskRunnerFactoryInitializerParams.actionsPlugin.isActionTypeEnabled.mockReturnValue(true); + taskRunnerFactoryInitializerParams.actionsPlugin.isActionExecutable.mockReturnValue(true); + }); + + test('updates rule saved object execution status and writes to event log entry when task is cancelled mid-execution', async () => { + const taskRunner = new TaskRunner( + ruleType, + mockedTaskInstance, + taskRunnerFactoryInitializerParams + ); + + const promise = taskRunner.run(); + await Promise.resolve(); + await taskRunner.cancel(); + await promise; + + const eventLogger = taskRunnerFactoryInitializerParams.eventLogger; + // execute-start event, timeout event and then an execute event because rule executors are not cancelling anything yet + expect(eventLogger.logEvent).toHaveBeenCalledTimes(3); + expect(eventLogger.startTiming).toHaveBeenCalledTimes(1); + expect(eventLogger.logEvent).toHaveBeenNthCalledWith(1, { + '@timestamp': '1970-01-01T00:00:00.000Z', + event: { + action: 'execute-start', + category: ['alerts'], + kind: 'alert', + }, + kibana: { + saved_objects: [ + { + id: '1', + rel: 'primary', + type: 'alert', + type_id: 'test', + }, + ], + task: { + schedule_delay: 0, + scheduled: '1970-01-01T00:00:00.000Z', + }, + }, + message: 'alert execution start: "1"', + rule: { + category: 'test', + id: '1', + license: 'basic', + ruleset: 'alerts', + }, + }); + expect(eventLogger.logEvent).toHaveBeenNthCalledWith(2, { + '@timestamp': '1970-01-01T00:00:00.000Z', + event: { + action: 'execute-timeout', + category: ['alerts'], + kind: 'alert', + }, + kibana: { + saved_objects: [ + { + id: '1', + rel: 'primary', + type: 'alert', + type_id: 'test', + }, + ], + }, + message: `rule: test:1: '' execution cancelled due to timeout - exceeded rule type timeout of 5m`, + rule: { + category: 'test', + id: '1', + license: 'basic', + ruleset: 'alerts', + }, + }); + expect(eventLogger.logEvent).toHaveBeenNthCalledWith(3, { + '@timestamp': '1970-01-01T00:00:00.000Z', + event: { + action: 'execute', + category: ['alerts'], + kind: 'alert', + outcome: 'success', + }, + kibana: { + alerting: { + status: 'ok', + }, + saved_objects: [ + { + id: '1', + rel: 'primary', + type: 'alert', + type_id: 'test', + }, + ], + task: { + schedule_delay: 0, + scheduled: '1970-01-01T00:00:00.000Z', + }, + }, + message: `alert executed: test:1: 'rule-name'`, + rule: { + category: 'test', + id: '1', + license: 'basic', + name: 'rule-name', + ruleset: 'alerts', + }, + }); + + expect( + taskRunnerFactoryInitializerParams.internalSavedObjectsRepository.update + ).toHaveBeenCalledTimes(1); + expect( + taskRunnerFactoryInitializerParams.internalSavedObjectsRepository.update + ).toHaveBeenCalledWith( + 'alert', + '1', + { + executionStatus: { + error: { + message: `test:1: execution cancelled due to timeout - exceeded rule type timeout of 5m`, + reason: 'timeout', + }, + lastDuration: 0, + lastExecutionDate: '1970-01-01T00:00:00.000Z', + status: 'error', + }, + }, + { refresh: false, namespace: undefined } + ); + }); + + test('actionsPlugin.execute is called if rule execution is cancelled but cancelAlertsOnRuleTimeout from config is false', async () => { + ruleType.executor.mockImplementation( + async ({ + services: executorServices, + }: AlertExecutorOptions< + AlertTypeParams, + AlertTypeState, + AlertInstanceState, + AlertInstanceContext, + string + >) => { + executorServices.alertInstanceFactory('1').scheduleActions('default'); + } + ); + // setting cancelAlertsOnRuleTimeout to false here + const taskRunner = new TaskRunner(ruleType, mockedTaskInstance, { + ...taskRunnerFactoryInitializerParams, + cancelAlertsOnRuleTimeout: false, + }); + + const promise = taskRunner.run(); + await Promise.resolve(); + await taskRunner.cancel(); + await promise; + + testActionsExecute(); + }); + + test('actionsPlugin.execute is called if rule execution is cancelled but cancelAlertsOnRuleTimeout for ruleType is false', async () => { + ruleTypeRegistry.get.mockReturnValue({ + ...ruleType, + cancelAlertsOnRuleTimeout: false, + }); + ruleType.executor.mockImplementation( + async ({ + services: executorServices, + }: AlertExecutorOptions< + AlertTypeParams, + AlertTypeState, + AlertInstanceState, + AlertInstanceContext, + string + >) => { + executorServices.alertInstanceFactory('1').scheduleActions('default'); + } + ); + // setting cancelAlertsOnRuleTimeout for ruleType to false here + const taskRunner = new TaskRunner( + { + ...ruleType, + cancelAlertsOnRuleTimeout: false, + }, + mockedTaskInstance, + taskRunnerFactoryInitializerParams + ); + + const promise = taskRunner.run(); + await Promise.resolve(); + await taskRunner.cancel(); + await promise; + + testActionsExecute(); + }); + + test('actionsPlugin.execute is skipped if rule execution is cancelled and cancelAlertsOnRuleTimeout for both config and ruleType are true', async () => { + ruleType.executor.mockImplementation( + async ({ + services: executorServices, + }: AlertExecutorOptions< + AlertTypeParams, + AlertTypeState, + AlertInstanceState, + AlertInstanceContext, + string + >) => { + executorServices.alertInstanceFactory('1').scheduleActions('default'); + } + ); + const taskRunner = new TaskRunner( + ruleType, + mockedTaskInstance, + taskRunnerFactoryInitializerParams + ); + + const promise = taskRunner.run(); + await Promise.resolve(); + await taskRunner.cancel(); + await promise; + + const logger = taskRunnerFactoryInitializerParams.logger; + expect(logger.debug).toHaveBeenCalledTimes(6); + expect(logger.debug).nthCalledWith(1, 'executing alert test:1 at 1970-01-01T00:00:00.000Z'); + expect(logger.debug).nthCalledWith( + 2, + `Cancelling rule type test with id 1 - execution exceeded rule type timeout of 5m` + ); + expect(logger.debug).nthCalledWith( + 3, + `Updating rule task for test rule with id 1 - execution error due to timeout` + ); + expect(logger.debug).nthCalledWith( + 4, + `alert test:1: 'rule-name' has 1 active alert instances: [{\"instanceId\":\"1\",\"actionGroup\":\"default\"}]` + ); + expect(logger.debug).nthCalledWith( + 5, + `no scheduling of actions for alert test:1: 'rule-name': alert execution has been cancelled.` + ); + expect(logger.debug).nthCalledWith( + 6, + 'alertExecutionStatus for test:1: {"lastExecutionDate":"1970-01-01T00:00:00.000Z","status":"active"}' + ); + + const eventLogger = taskRunnerFactoryInitializerParams.eventLogger; + expect(eventLogger.startTiming).toHaveBeenCalledTimes(1); + expect(eventLogger.logEvent).toHaveBeenCalledTimes(3); + expect(eventLogger.logEvent).toHaveBeenNthCalledWith(1, { + '@timestamp': '1970-01-01T00:00:00.000Z', + event: { + action: 'execute-start', + category: ['alerts'], + kind: 'alert', + }, + kibana: { + task: { + schedule_delay: 0, + scheduled: '1970-01-01T00:00:00.000Z', + }, + saved_objects: [ + { + id: '1', + rel: 'primary', + type: 'alert', + type_id: 'test', + }, + ], + }, + message: `alert execution start: \"1\"`, + rule: { + category: 'test', + id: '1', + license: 'basic', + ruleset: 'alerts', + }, + }); + expect(eventLogger.logEvent).toHaveBeenNthCalledWith(2, { + '@timestamp': '1970-01-01T00:00:00.000Z', + event: { + action: 'execute-timeout', + category: ['alerts'], + kind: 'alert', + }, + kibana: { + saved_objects: [ + { + id: '1', + namespace: undefined, + rel: 'primary', + type: 'alert', + type_id: 'test', + }, + ], + }, + message: `rule: test:1: '' execution cancelled due to timeout - exceeded rule type timeout of 5m`, + rule: { + category: 'test', + id: '1', + license: 'basic', + ruleset: 'alerts', + }, + }); + expect(eventLogger.logEvent).toHaveBeenNthCalledWith(3, { + '@timestamp': '1970-01-01T00:00:00.000Z', + event: { + action: 'execute', + category: ['alerts'], + kind: 'alert', + outcome: 'success', + }, + kibana: { + alerting: { + status: 'active', + }, + task: { + schedule_delay: 0, + scheduled: '1970-01-01T00:00:00.000Z', + }, + saved_objects: [ + { + id: '1', + namespace: undefined, + rel: 'primary', + type: 'alert', + type_id: 'test', + }, + ], + }, + message: "alert executed: test:1: 'rule-name'", + rule: { + category: 'test', + id: '1', + license: 'basic', + name: 'rule-name', + ruleset: 'alerts', + }, + }); + }); + + function testActionsExecute() { + const logger = taskRunnerFactoryInitializerParams.logger; + expect(logger.debug).toHaveBeenCalledTimes(5); + expect(logger.debug).nthCalledWith(1, 'executing alert test:1 at 1970-01-01T00:00:00.000Z'); + expect(logger.debug).nthCalledWith( + 2, + `Cancelling rule type test with id 1 - execution exceeded rule type timeout of 5m` + ); + expect(logger.debug).nthCalledWith( + 3, + `Updating rule task for test rule with id 1 - execution error due to timeout` + ); + expect(logger.debug).nthCalledWith( + 4, + `alert test:1: 'rule-name' has 1 active alert instances: [{\"instanceId\":\"1\",\"actionGroup\":\"default\"}]` + ); + expect(logger.debug).nthCalledWith( + 5, + 'alertExecutionStatus for test:1: {"lastExecutionDate":"1970-01-01T00:00:00.000Z","status":"active"}' + ); + + const eventLogger = taskRunnerFactoryInitializerParams.eventLogger; + expect(eventLogger.logEvent).toHaveBeenCalledTimes(6); + expect(eventLogger.logEvent).toHaveBeenNthCalledWith(1, { + '@timestamp': '1970-01-01T00:00:00.000Z', + event: { + action: 'execute-start', + category: ['alerts'], + kind: 'alert', + }, + kibana: { + task: { + schedule_delay: 0, + scheduled: '1970-01-01T00:00:00.000Z', + }, + saved_objects: [ + { + id: '1', + namespace: undefined, + rel: 'primary', + type: 'alert', + type_id: 'test', + }, + ], + }, + message: `alert execution start: "1"`, + rule: { + category: 'test', + id: '1', + license: 'basic', + ruleset: 'alerts', + }, + }); + expect(eventLogger.logEvent).toHaveBeenNthCalledWith(2, { + '@timestamp': '1970-01-01T00:00:00.000Z', + event: { + action: 'execute-timeout', + category: ['alerts'], + kind: 'alert', + }, + kibana: { + saved_objects: [ + { + id: '1', + namespace: undefined, + rel: 'primary', + type: 'alert', + type_id: 'test', + }, + ], + }, + message: `rule: test:1: '' execution cancelled due to timeout - exceeded rule type timeout of 5m`, + rule: { + category: 'test', + id: '1', + license: 'basic', + ruleset: 'alerts', + }, + }); + expect(eventLogger.logEvent).toHaveBeenNthCalledWith(3, { + event: { + action: 'new-instance', + category: ['alerts'], + kind: 'alert', + duration: 0, + start: '1970-01-01T00:00:00.000Z', + }, + kibana: { + alerting: { + action_group_id: 'default', + instance_id: '1', + }, + saved_objects: [ + { + id: '1', + namespace: undefined, + rel: 'primary', + type: 'alert', + type_id: 'test', + }, + ], + }, + message: "test:1: 'rule-name' created new instance: '1'", + rule: { + category: 'test', + id: '1', + license: 'basic', + name: 'rule-name', + namespace: undefined, + ruleset: 'alerts', + }, + }); + expect(eventLogger.logEvent).toHaveBeenNthCalledWith(4, { + event: { + action: 'active-instance', + category: ['alerts'], + duration: 0, + kind: 'alert', + start: '1970-01-01T00:00:00.000Z', + }, + kibana: { + alerting: { + action_group_id: 'default', + instance_id: '1', + }, + saved_objects: [ + { id: '1', namespace: undefined, rel: 'primary', type: 'alert', type_id: 'test' }, + ], + }, + message: "test:1: 'rule-name' active instance: '1' in actionGroup: 'default'", + rule: { + category: 'test', + id: '1', + license: 'basic', + name: 'rule-name', + ruleset: 'alerts', + }, + }); + expect(eventLogger.logEvent).toHaveBeenNthCalledWith(5, { + event: { + action: 'execute-action', + category: ['alerts'], + kind: 'alert', + }, + kibana: { + alerting: { + instance_id: '1', + action_group_id: 'default', + }, + saved_objects: [ + { + id: '1', + rel: 'primary', + type: 'alert', + type_id: 'test', + }, + { + id: '1', + type: 'action', + type_id: 'action', + }, + ], + }, + message: + "alert: test:1: 'rule-name' instanceId: '1' scheduled actionGroup: 'default' action: action:1", + rule: { + category: 'test', + id: '1', + license: 'basic', + name: 'rule-name', + ruleset: 'alerts', + }, + }); + expect(eventLogger.logEvent).toHaveBeenNthCalledWith(6, { + '@timestamp': '1970-01-01T00:00:00.000Z', + event: { action: 'execute', category: ['alerts'], kind: 'alert', outcome: 'success' }, + kibana: { + alerting: { + status: 'active', + }, + task: { + schedule_delay: 0, + scheduled: '1970-01-01T00:00:00.000Z', + }, + saved_objects: [ + { + id: '1', + namespace: undefined, + rel: 'primary', + type: 'alert', + type_id: 'test', + }, + ], + }, + message: "alert executed: test:1: 'rule-name'", + rule: { + category: 'test', + id: '1', + license: 'basic', + name: 'rule-name', + ruleset: 'alerts', + }, + }); + } +}); diff --git a/x-pack/plugins/alerting/server/task_runner/task_runner_factory.test.ts b/x-pack/plugins/alerting/server/task_runner/task_runner_factory.test.ts index d262607958347..b799dd2f4043d 100644 --- a/x-pack/plugins/alerting/server/task_runner/task_runner_factory.test.ts +++ b/x-pack/plugins/alerting/server/task_runner/task_runner_factory.test.ts @@ -83,7 +83,8 @@ describe('Task Runner Factory', () => { ruleTypeRegistry: ruleTypeRegistryMock.create(), kibanaBaseUrl: 'https://localhost:5601', supportsEphemeralTasks: true, - maxEphemeralActionsPerAlert: new Promise((resolve) => resolve(10)), + maxEphemeralActionsPerAlert: 10, + cancelAlertsOnRuleTimeout: true, executionContext, }; diff --git a/x-pack/plugins/alerting/server/task_runner/task_runner_factory.ts b/x-pack/plugins/alerting/server/task_runner/task_runner_factory.ts index 524b779a0d9ac..fc4b8eee89f5e 100644 --- a/x-pack/plugins/alerting/server/task_runner/task_runner_factory.ts +++ b/x-pack/plugins/alerting/server/task_runner/task_runner_factory.ts @@ -44,7 +44,8 @@ export interface TaskRunnerContext { ruleTypeRegistry: RuleTypeRegistry; kibanaBaseUrl: string | undefined; supportsEphemeralTasks: boolean; - maxEphemeralActionsPerAlert: Promise; + maxEphemeralActionsPerAlert: number; + cancelAlertsOnRuleTimeout: boolean; } export class TaskRunnerFactory { diff --git a/x-pack/plugins/alerting/server/types.ts b/x-pack/plugins/alerting/server/types.ts index 82bb94b121840..c1645936c06e9 100644 --- a/x-pack/plugins/alerting/server/types.ts +++ b/x-pack/plugins/alerting/server/types.ts @@ -160,6 +160,7 @@ export interface AlertType< defaultScheduleInterval?: string; minimumScheduleInterval?: string; ruleTaskTimeout?: string; + cancelAlertsOnRuleTimeout?: boolean; } export type UntypedAlertType = AlertType< AlertTypeParams, diff --git a/x-pack/plugins/task_manager/server/task_pool.test.ts b/x-pack/plugins/task_manager/server/task_pool.test.ts index 05eb7bd1b43e1..8b10fb6c1ca3d 100644 --- a/x-pack/plugins/task_manager/server/task_pool.test.ts +++ b/x-pack/plugins/task_manager/server/task_pool.test.ts @@ -357,6 +357,33 @@ describe('TaskPool', () => { ); }); + test('only allows one task with the same id in the task pool', async () => { + const logger = loggingSystemMock.create().get(); + const pool = new TaskPool({ + maxWorkers$: of(2), + logger, + }); + + const shouldRun = mockRun(); + const shouldNotRun = mockRun(); + + const taskId = uuid.v4(); + const task1 = mockTask({ id: taskId, run: shouldRun }); + const task2 = mockTask({ + id: taskId, + run: shouldNotRun, + isSameTask() { + return true; + }, + }); + + await pool.run([task1]); + await pool.run([task2]); + + expect(shouldRun).toHaveBeenCalledTimes(1); + expect(shouldNotRun).not.toHaveBeenCalled(); + }); + function mockRun() { return jest.fn(async () => { await sleep(0); @@ -367,6 +394,7 @@ describe('TaskPool', () => { function mockTask(overrides = {}) { return { isExpired: false, + taskExecutionId: uuid.v4(), id: uuid.v4(), cancel: async () => undefined, markTaskAsRunning: jest.fn(async () => true), @@ -387,6 +415,9 @@ describe('TaskPool', () => { createTaskRunner: jest.fn(), }; }, + isSameTask() { + return false; + }, ...overrides, }; } diff --git a/x-pack/plugins/task_manager/server/task_pool.ts b/x-pack/plugins/task_manager/server/task_pool.ts index 87de0e691d471..d8a9fdd6223fe 100644 --- a/x-pack/plugins/task_manager/server/task_pool.ts +++ b/x-pack/plugins/task_manager/server/task_pool.ts @@ -112,9 +112,21 @@ export class TaskPool { if (tasksToRun.length) { await Promise.all( tasksToRun - .filter((taskRunner) => !this.tasksInPool.has(taskRunner.id)) + .filter( + (taskRunner) => + !Array.from(this.tasksInPool.keys()).some((executionId: string) => + taskRunner.isSameTask(executionId) + ) + ) .map(async (taskRunner) => { - this.tasksInPool.set(taskRunner.id, taskRunner); + // We use taskRunner.taskExecutionId instead of taskRunner.id as key for the task pool map because + // task cancellation is a non-blocking procedure. We calculate the expiration and immediately remove + // the task from the task pool. There is a race condition that can occur when a recurring tasks's schedule + // matches its timeout value. A new instance of the task can be claimed and added to the task pool before + // the cancel function (meant for the previous instance of the task) is actually called. This means the wrong + // task instance is cancelled. We introduce the taskExecutionId to differentiate between these overlapping instances and + // ensure that the correct task instance is cancelled. + this.tasksInPool.set(taskRunner.taskExecutionId, taskRunner); return taskRunner .markTaskAsRunning() .then((hasTaskBeenMarkAsRunning: boolean) => @@ -165,12 +177,12 @@ export class TaskPool { } }) .then(() => { - this.tasksInPool.delete(taskRunner.id); + this.tasksInPool.delete(taskRunner.taskExecutionId); }); } private handleFailureOfMarkAsRunning(task: TaskRunner, err: Error) { - this.tasksInPool.delete(task.id); + this.tasksInPool.delete(task.taskExecutionId); this.logger.error(`Failed to mark Task ${task.toString()} as running: ${err.message}`); } @@ -198,7 +210,7 @@ export class TaskPool { private async cancelTask(task: TaskRunner) { try { this.logger.debug(`Cancelling task ${task.toString()}.`); - this.tasksInPool.delete(task.id); + this.tasksInPool.delete(task.taskExecutionId); await task.cancel(); } catch (err) { this.logger.error(`Failed to cancel task ${task.toString()}: ${err}`); diff --git a/x-pack/plugins/task_manager/server/task_running/ephemeral_task_runner.ts b/x-pack/plugins/task_manager/server/task_running/ephemeral_task_runner.ts index 358a8003382b0..0695ed149c9a4 100644 --- a/x-pack/plugins/task_manager/server/task_running/ephemeral_task_runner.ts +++ b/x-pack/plugins/task_manager/server/task_running/ephemeral_task_runner.ts @@ -12,6 +12,7 @@ */ import apm from 'elastic-apm-node'; +import uuid from 'uuid'; import { withSpan } from '@kbn/apm-utils'; import { identity } from 'lodash'; import { Logger, ExecutionContextStart } from '../../../../../src/core/server'; @@ -75,6 +76,7 @@ export class EphemeralTaskManagerRunner implements TaskRunner { private beforeRun: Middleware['beforeRun']; private beforeMarkRunning: Middleware['beforeMarkRunning']; private onTaskEvent: (event: TaskRun | TaskMarkRunning) => void; + private uuid: string; private readonly executionContext: ExecutionContextStart; /** @@ -102,6 +104,7 @@ export class EphemeralTaskManagerRunner implements TaskRunner { this.beforeMarkRunning = beforeMarkRunning; this.onTaskEvent = onTaskEvent; this.executionContext = executionContext; + this.uuid = uuid.v4(); } /** @@ -111,6 +114,21 @@ export class EphemeralTaskManagerRunner implements TaskRunner { return this.instance.task.id; } + /** + * Gets the exeuction id of this task instance. + */ + public get taskExecutionId() { + return `${this.id}::${this.uuid}`; + } + + /** + * Test whether given execution ID identifies a different execution of this same task + * @param id + */ + public isSameTask(executionId: string) { + return executionId.startsWith(this.id); + } + /** * Gets the task type of this task instance. */ diff --git a/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts b/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts index 86e2230461eb5..02be86c3db0c2 100644 --- a/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts +++ b/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts @@ -32,6 +32,10 @@ const minutesFromNow = (mins: number): Date => secondsFromNow(mins * 60); let fakeTimer: sinon.SinonFakeTimers; +jest.mock('uuid', () => ({ + v4: () => 'NEW_UUID', +})); + beforeAll(() => { fakeTimer = sinon.useFakeTimers(); }); @@ -45,6 +49,19 @@ describe('TaskManagerRunner', () => { end: jest.fn(), }; + test('execution ID', async () => { + const { runner } = await pendingStageSetup({ + instance: { + id: 'foo', + taskType: 'bar', + }, + }); + + expect(runner.taskExecutionId).toEqual(`foo::NEW_UUID`); + expect(runner.isSameTask(`foo::ANOTHER_UUID`)).toEqual(true); + expect(runner.isSameTask(`bar::ANOTHER_UUID`)).toEqual(false); + }); + describe('Pending Stage', () => { beforeEach(() => { jest.clearAllMocks(); diff --git a/x-pack/plugins/task_manager/server/task_running/task_runner.ts b/x-pack/plugins/task_manager/server/task_running/task_runner.ts index 2a5d48845ce48..cd07b4db728c4 100644 --- a/x-pack/plugins/task_manager/server/task_running/task_runner.ts +++ b/x-pack/plugins/task_manager/server/task_running/task_runner.ts @@ -12,6 +12,7 @@ */ import apm from 'elastic-apm-node'; +import uuid from 'uuid'; import { withSpan } from '@kbn/apm-utils'; import { identity, defaults, flow } from 'lodash'; import { @@ -68,9 +69,11 @@ export interface TaskRunner { markTaskAsRunning: () => Promise; run: () => Promise>; id: string; + taskExecutionId: string; stage: string; isEphemeral?: boolean; toString: () => string; + isSameTask: (executionId: string) => boolean; } export enum TaskRunningStage { @@ -141,6 +144,7 @@ export class TaskManagerRunner implements TaskRunner { private beforeMarkRunning: Middleware['beforeMarkRunning']; private onTaskEvent: (event: TaskRun | TaskMarkRunning) => void; private defaultMaxAttempts: number; + private uuid: string; private readonly executionContext: ExecutionContextStart; /** @@ -173,6 +177,7 @@ export class TaskManagerRunner implements TaskRunner { this.onTaskEvent = onTaskEvent; this.defaultMaxAttempts = defaultMaxAttempts; this.executionContext = executionContext; + this.uuid = uuid.v4(); } /** @@ -182,6 +187,21 @@ export class TaskManagerRunner implements TaskRunner { return this.instance.task.id; } + /** + * Gets the execution id of this task instance. + */ + public get taskExecutionId() { + return `${this.id}::${this.uuid}`; + } + + /** + * Test whether given execution ID identifies a different execution of this same task + * @param id + */ + public isSameTask(executionId: string) { + return executionId.startsWith(this.id); + } + /** * Gets the task type of this task instance. */ diff --git a/x-pack/plugins/triggers_actions_ui/public/application/sections/alerts_list/translations.ts b/x-pack/plugins/triggers_actions_ui/public/application/sections/alerts_list/translations.ts index 95322fa0d0bcf..8181a5171d198 100644 --- a/x-pack/plugins/triggers_actions_ui/public/application/sections/alerts_list/translations.ts +++ b/x-pack/plugins/triggers_actions_ui/public/application/sections/alerts_list/translations.ts @@ -92,10 +92,18 @@ export const ALERT_ERROR_LICENSE_REASON = i18n.translate( } ); +export const ALERT_ERROR_TIMEOUT_REASON = i18n.translate( + 'xpack.triggersActionsUI.sections.alertsList.alertErrorReasonTimeout', + { + defaultMessage: 'Rule execution cancelled due to timeout.', + } +); + export const alertsErrorReasonTranslationsMapping = { read: ALERT_ERROR_READING_REASON, decrypt: ALERT_ERROR_DECRYPTING_REASON, execute: ALERT_ERROR_EXECUTION_REASON, unknown: ALERT_ERROR_UNKNOWN_REASON, license: ALERT_ERROR_LICENSE_REASON, + timeout: ALERT_ERROR_TIMEOUT_REASON, }; diff --git a/x-pack/test/alerting_api_integration/common/fixtures/plugins/alerts/server/alert_types.ts b/x-pack/test/alerting_api_integration/common/fixtures/plugins/alerts/server/alert_types.ts index 7978d6c82025f..00adf916bb713 100644 --- a/x-pack/test/alerting_api_integration/common/fixtures/plugins/alerts/server/alert_types.ts +++ b/x-pack/test/alerting_api_integration/common/fixtures/plugins/alerts/server/alert_types.ts @@ -465,6 +465,56 @@ function getPatternFiringAlertType() { return result; } +function getLongRunningPatternRuleType(cancelAlertsOnRuleTimeout: boolean = true) { + const paramsSchema = schema.object({ + pattern: schema.arrayOf(schema.boolean()), + }); + type ParamsType = TypeOf; + interface State extends AlertTypeState { + patternIndex?: number; + } + const result: AlertType = { + id: `test.patternLongRunning${ + cancelAlertsOnRuleTimeout === true ? '.cancelAlertsOnRuleTimeout' : '' + }`, + name: 'Test: Run Long on a Pattern', + actionGroups: [{ id: 'default', name: 'Default' }], + producer: 'alertsFixture', + defaultActionGroupId: 'default', + minimumLicenseRequired: 'basic', + isExportable: true, + ruleTaskTimeout: '3s', + cancelAlertsOnRuleTimeout, + async executor(ruleExecutorOptions) { + const { services, state, params } = ruleExecutorOptions; + const pattern = params.pattern; + if (!Array.isArray(pattern)) { + throw new Error(`pattern is not an array`); + } + + // await new Promise((resolve) => setTimeout(resolve, 5000)); + + // get the pattern index, return if past it + const patternIndex = state.patternIndex ?? 0; + if (patternIndex >= pattern.length) { + return { patternIndex }; + } + + // run long if pattern says to + if (pattern[patternIndex] === true) { + await new Promise((resolve) => setTimeout(resolve, 10000)); + } + + services.alertInstanceFactory('alert').scheduleActions('default', {}); + + return { + patternIndex: patternIndex + 1, + }; + }, + }; + return result; +} + export function defineAlertTypes( core: CoreSetup, { alerting }: Pick @@ -579,4 +629,6 @@ export function defineAlertTypes( alerting.registerType(longRunningAlertType); alerting.registerType(goldNoopAlertType); alerting.registerType(exampleAlwaysFiringAlertType); + alerting.registerType(getLongRunningPatternRuleType()); + alerting.registerType(getLongRunningPatternRuleType(false)); } diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/builtin_alert_types/index.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/builtin_alert_types/index.ts index 965cf352ab7ed..14748737f0d53 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/builtin_alert_types/index.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/builtin_alert_types/index.ts @@ -12,5 +12,6 @@ export default function alertingTests({ loadTestFile }: FtrProviderContext) { describe('builtin alertTypes', () => { loadTestFile(require.resolve('./index_threshold')); loadTestFile(require.resolve('./es_query')); + loadTestFile(require.resolve('./long_running')); }); } diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/builtin_alert_types/long_running/index.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/builtin_alert_types/long_running/index.ts new file mode 100644 index 0000000000000..d997cebc6fd8d --- /dev/null +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/builtin_alert_types/long_running/index.ts @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { FtrProviderContext } from '../../../../../common/ftr_provider_context'; + +// eslint-disable-next-line import/no-default-export +export default function alertingTests({ loadTestFile }: FtrProviderContext) { + describe('long_running_rule', () => { + loadTestFile(require.resolve('./rule')); + }); +} diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/builtin_alert_types/long_running/rule.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/builtin_alert_types/long_running/rule.ts new file mode 100644 index 0000000000000..da9e7aadb571e --- /dev/null +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/builtin_alert_types/long_running/rule.ts @@ -0,0 +1,161 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import expect from '@kbn/expect'; + +import { Spaces } from '../../../../scenarios'; +import { FtrProviderContext } from '../../../../../common/ftr_provider_context'; +import { getUrlPrefix, ObjectRemover, getEventLog } from '../../../../../common/lib'; + +const RULE_INTERVAL_SECONDS = 3; + +// eslint-disable-next-line import/no-default-export +export default function ruleTests({ getService }: FtrProviderContext) { + const supertest = getService('supertest'); + const retry = getService('retry'); + + describe('rule', async () => { + const objectRemover = new ObjectRemover(supertest); + + afterEach(async () => { + await objectRemover.removeAll(); + }); + + it('writes event log document for timeout for each rule execution that ends in timeout - every execution times out', async () => { + const ruleId = await createRule({ + name: 'long running rule', + ruleTypeId: 'test.patternLongRunning.cancelAlertsOnRuleTimeout', + pattern: [true, true, true, true], + }); + // get the events we're expecting + const events = await retry.try(async () => { + return await getEventLog({ + getService, + spaceId: Spaces.space1.id, + type: 'alert', + id: ruleId, + provider: 'alerting', + actions: new Map([ + // make sure the counts of the # of events per type are as expected + ['execute-start', { gte: 4 }], + ['execute', { gte: 4 }], + ['execute-timeout', { gte: 4 }], + ]), + }); + }); + + // no active|recovered|new instance events should exist + expect(events.filter((event) => event?.event?.action === 'active-instance').length).to.equal( + 0 + ); + expect(events.filter((event) => event?.event?.action === 'new-instance').length).to.equal(0); + expect( + events.filter((event) => event?.event?.action === 'recovered-instance').length + ).to.equal(0); + + // rule execution status should be in error with reason timeout + const { status, body: rule } = await supertest.get( + `${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule/${ruleId}` + ); + expect(status).to.eql(200); + expect(rule.execution_status.status).to.eql('error'); + expect(rule.execution_status.error.message).to.eql( + `test.patternLongRunning.cancelAlertsOnRuleTimeout:${ruleId}: execution cancelled due to timeout - exceeded rule type timeout of 3s` + ); + expect(rule.execution_status.error.reason).to.eql('timeout'); + }); + + it('writes event log document for timeout for each rule execution that ends in timeout - some executions times out', async () => { + const ruleId = await createRule({ + name: 'long running rule', + ruleTypeId: 'test.patternLongRunning.cancelAlertsOnRuleTimeout', + pattern: [false, true, false, false], + }); + // get the events we're expecting + await retry.try(async () => { + return await getEventLog({ + getService, + spaceId: Spaces.space1.id, + type: 'alert', + id: ruleId, + provider: 'alerting', + actions: new Map([ + // make sure the counts of the # of events per type are as expected + ['execute-start', { gte: 4 }], + ['execute', { gte: 4 }], + ['execute-timeout', { gte: 1 }], + ['new-instance', { equal: 1 }], + ['active-instance', { gte: 2 }], + ]), + }); + }); + + const { status, body: rule } = await supertest.get( + `${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule/${ruleId}` + ); + expect(status).to.eql(200); + expect(rule.execution_status.status).to.eql('active'); + }); + + it('still logs alert docs when rule exceeds timeout when cancelAlertsOnRuleTimeout is false on rule type', async () => { + const ruleId = await createRule({ + name: 'long running rule', + ruleTypeId: 'test.patternLongRunning', + pattern: [true, true, true, true], + }); + // get the events we're expecting + await retry.try(async () => { + return await getEventLog({ + getService, + spaceId: Spaces.space1.id, + type: 'alert', + id: ruleId, + provider: 'alerting', + actions: new Map([ + // make sure the counts of the # of events per type are as expected + ['execute-start', { gte: 4 }], + ['execute', { gte: 4 }], + ['execute-timeout', { gte: 4 }], + ['new-instance', { gte: 1 }], + ['active-instance', { gte: 3 }], + ]), + }); + }); + }); + + interface CreateRuleParams { + name: string; + ruleTypeId: string; + pattern: boolean[]; + } + + async function createRule(params: CreateRuleParams): Promise { + const { status, body: createdRule } = await supertest + .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) + .set('kbn-xsrf', 'foo') + .send({ + name: params.name, + consumer: 'alerts', + enabled: true, + rule_type_id: params.ruleTypeId, + schedule: { interval: `${RULE_INTERVAL_SECONDS}s` }, + actions: [], + notify_when: 'onActiveAlert', + params: { + pattern: params.pattern, + }, + }); + + expect(status).to.be(200); + + const ruleId = createdRule.id; + objectRemover.add(Spaces.space1.id, ruleId, 'rule', 'alerting'); + + return ruleId; + } + }); +}