diff --git a/x-pack/plugins/alerting/server/alert_instance/create_abortable_es_client_factory.ts b/x-pack/plugins/alerting/server/alert_instance/create_abortable_es_client_factory.ts new file mode 100644 index 0000000000000..a67f537b3a102 --- /dev/null +++ b/x-pack/plugins/alerting/server/alert_instance/create_abortable_es_client_factory.ts @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { TransportRequestPromise } from '@elastic/elasticsearch/lib/Transport'; +import { BehaviorSubject } from 'rxjs'; +import { filter } from 'rxjs/operators'; +import { IScopedClusterClient, Logger } from 'src/core/server'; +import type { ESSearchRequest } from 'src/core/types/elasticsearch'; + +export interface CreateAbortableEsClientFactoryOpts { + scopedClusterClient: IScopedClusterClient; + cancelled$: BehaviorSubject; + logger: Logger; +} + +export function createAbortableEsClientFactory(opts: CreateAbortableEsClientFactoryOpts) { + const { scopedClusterClient, cancelled$, logger } = opts; + return (query: ESSearchRequest, asInternalUser: boolean = true) => { + const esClient = asInternalUser + ? scopedClusterClient.asInternalUser + : scopedClusterClient.asCurrentUser; + + return cancelEsRequestOnAbort(esClient.search(query), cancelled$, logger); + }; +} + +export function cancelEsRequestOnAbort>( + promise: T, + cancelled$: BehaviorSubject, + logger: Logger +) { + const subscription = cancelled$.pipe(filter((cancelled: boolean) => cancelled)).subscribe(() => { + logger.info(`Cancelling es search!!!!!`); + promise.abort(); + }); + + // using .catch() here means unsubscribe will be called + // after it has thrown an error, so we use .then(onSuccess, onFailure) + // syntax + promise.then( + () => subscription.unsubscribe(), + () => subscription.unsubscribe() + ); + + return promise; +} diff --git a/x-pack/plugins/alerting/server/plugin.ts b/x-pack/plugins/alerting/server/plugin.ts index bb42beba6e237..849faba958d2f 100644 --- a/x-pack/plugins/alerting/server/plugin.ts +++ b/x-pack/plugins/alerting/server/plugin.ts @@ -79,6 +79,7 @@ export const EVENT_LOG_ACTIONS = { newInstance: 'new-instance', recoveredInstance: 'recovered-instance', activeInstance: 'active-instance', + cancel: 'cancelled', }; export const LEGACY_EVENT_LOG_ACTIONS = { resolvedInstance: 'resolved-instance', diff --git a/x-pack/plugins/alerting/server/task_runner/task_runner.ts b/x-pack/plugins/alerting/server/task_runner/task_runner.ts index 6694146c3e2b9..c512368e3604d 100644 --- a/x-pack/plugins/alerting/server/task_runner/task_runner.ts +++ b/x-pack/plugins/alerting/server/task_runner/task_runner.ts @@ -8,6 +8,7 @@ import type { PublicMethodsOf } from '@kbn/utility-types'; import { Dictionary, pickBy, mapValues, without, cloneDeep } from 'lodash'; import type { Request } from '@hapi/hapi'; +import { BehaviorSubject } from 'rxjs'; import { addSpaceIdToPath } from '../../../spaces/server'; import { Logger, KibanaRequest } from '../../../../../src/core/server'; import { TaskRunnerContext } from './task_runner_factory'; @@ -51,6 +52,7 @@ import { } from '../../common'; import { NormalizedAlertType } from '../rule_type_registry'; import { getEsErrorMessage } from '../lib/errors'; +import { createAbortableEsClientFactory } from '../alert_instance/create_abortable_es_client_factory'; const FALLBACK_RETRY_INTERVAL = '5m'; @@ -90,6 +92,7 @@ export class TaskRunner< RecoveryActionGroupId >; private readonly ruleTypeRegistry: RuleTypeRegistry; + private cancelled$: BehaviorSubject; constructor( alertType: NormalizedAlertType< @@ -109,6 +112,7 @@ export class TaskRunner< this.alertType = alertType; this.taskInstance = taskInstanceToAlertTaskInstance(taskInstance); this.ruleTypeRegistry = context.ruleTypeRegistry; + this.cancelled$ = new BehaviorSubject(false); } async getApiKeyForAlertPermissions(alertId: string, spaceId: string) { @@ -281,6 +285,12 @@ export class TaskRunner< InstanceContext, WithoutReservedActionGroups >(alertInstances), + abortableEsClient: createAbortableEsClientFactory({ + scopedClusterClient: services.scopedClusterClient, + cancelled$: this.cancelled$, + logger: this.logger, + }), + executionIsCancelled: createExecutionCancelledService(this.cancelled$), }, params, state: alertTypeState as State, @@ -365,7 +375,7 @@ export class TaskRunner< rule: alert, }); - if (!muteAll) { + if (!muteAll && !this.cancelled$.getValue()) { const mutedInstanceIdsSet = new Set(mutedInstanceIds); scheduleActionsForRecoveredInstances({ @@ -419,7 +429,14 @@ export class TaskRunner< ) ); } else { - this.logger.debug(`no scheduling of actions for alert ${alertLabel}: alert is muted.`); + if (muteAll) { + this.logger.debug(`no scheduling of actions for alert ${alertLabel}: alert is muted.`); + } + if (this.cancelled$.getValue()) { + this.logger.debug( + `no scheduling of actions for alert ${alertLabel}: alert execution has been cancelled.` + ); + } } return { @@ -649,6 +666,46 @@ export class TaskRunner< }), }; } + + async cancel(): Promise { + // Received cancel signal from Task Manager + this.cancelled$.next(true); + + // Write event log entry + const { + params: { alertId, spaceId }, + } = this.taskInstance; + const namespace = this.context.spaceIdToNamespace(spaceId); + + const eventLogger = this.context.eventLogger; + const event: IEvent = { + '@timestamp': new Date().toISOString(), + event: { + action: EVENT_LOG_ACTIONS.cancel, + kind: 'alert', + category: [this.alertType.producer], + }, + message: `rule execution cancelled by task manager: "${alertId}"`, + kibana: { + saved_objects: [ + { + rel: SAVED_OBJECT_REL_PRIMARY, + type: 'alert', + id: alertId, + type_id: this.alertType.id, + namespace, + }, + ], + }, + rule: { + id: alertId, + license: this.alertType.minimumLicenseRequired, + category: this.alertType.id, + ruleset: this.alertType.producer, + }, + }; + eventLogger.logEvent(event); + } } interface TrackAlertDurationsParams< @@ -967,3 +1024,9 @@ async function errorAsAlertTaskRunResult( }; } } + +function createExecutionCancelledService(cancelled$: BehaviorSubject) { + return () => { + return cancelled$.getValue(); + }; +} diff --git a/x-pack/plugins/alerting/server/types.ts b/x-pack/plugins/alerting/server/types.ts index ba35890efd781..71dde19f0e1e2 100644 --- a/x-pack/plugins/alerting/server/types.ts +++ b/x-pack/plugins/alerting/server/types.ts @@ -7,6 +7,9 @@ import type { IRouter, RequestHandlerContext, SavedObjectReference } from 'src/core/server'; import type { PublicMethodsOf } from '@kbn/utility-types'; +import { ApiResponse, TransportRequestPromise } from '@elastic/elasticsearch/lib/Transport'; +import { SearchResponse } from '@elastic/elasticsearch/api/types'; +import { ESSearchRequest } from 'src/core/types/elasticsearch'; import { PublicAlertInstance } from './alert_instance'; import { RuleTypeRegistry as OrigruleTypeRegistry } from './rule_type_registry'; import { PluginSetupContract, PluginStartContract } from './plugin'; @@ -75,6 +78,11 @@ export interface AlertServices< alertInstanceFactory: ( id: string ) => PublicAlertInstance; + abortableEsClient: ( + query: ESSearchRequest, + asInternalUser: boolean + ) => TransportRequestPromise, unknown>>; + executionIsCancelled: () => boolean; } export interface AlertExecutorOptions< diff --git a/x-pack/plugins/stack_alerts/server/alert_types/es_query/alert_type.ts b/x-pack/plugins/stack_alerts/server/alert_types/es_query/alert_type.ts index 5e2e4699c25ca..cbbc18626ff61 100644 --- a/x-pack/plugins/stack_alerts/server/alert_types/es_query/alert_type.ts +++ b/x-pack/plugins/stack_alerts/server/alert_types/es_query/alert_type.ts @@ -164,7 +164,6 @@ export function getAlertType( const { alertId, name, services, params, state } = options; const previousTimestamp = state.latestTimestamp; - const esClient = services.scopedClusterClient.asCurrentUser; const { parsedQuery, dateStart, dateEnd } = getSearchParams(params); const compareFn = ComparatorFns.get(params.thresholdComparator); @@ -225,9 +224,20 @@ export function getAlertType( logger.debug(`alert ${ES_QUERY_ID}:${alertId} "${name}" query - ${JSON.stringify(query)}`); - const { body: searchResult } = await esClient.search(query); + const { body: searchResult } = await services.abortableEsClient(query, false); - logger.debug( + // artificially set long delay for testing + await new Promise((done) => setTimeout(() => done(), 120000)); + + // check if execution is cancelled + if (services.executionIsCancelled()) { + logger.warn(`cancel signal received! return early from rule executor`); + return; + } else { + logger.info('execution is not cancelled!'); + } + + logger.info( `alert ${ES_QUERY_ID}:${alertId} "${name}" result - ${JSON.stringify(searchResult)}` ); @@ -256,7 +266,7 @@ export function getAlertType( }; const actionContext = addMessages(options, baseContext, params); - const alertInstance = options.services.alertInstanceFactory(ConditionMetAlertInstanceId); + const alertInstance = services.alertInstanceFactory(ConditionMetAlertInstanceId); alertInstance // store the params we would need to recreate the query that led to this alert instance .replaceState({ latestTimestamp: timestamp, dateStart, dateEnd }) diff --git a/x-pack/plugins/task_manager/server/task.ts b/x-pack/plugins/task_manager/server/task.ts index 2452e3e6f4920..7dcd6562b941d 100644 --- a/x-pack/plugins/task_manager/server/task.ts +++ b/x-pack/plugins/task_manager/server/task.ts @@ -116,7 +116,7 @@ export const taskDefinitionSchema = schema.object( * the task will be re-attempted. */ timeout: schema.string({ - defaultValue: '5m', + defaultValue: '1m', // SETTING THIS LOW FOR TESTING!!! }), /** * Up to how many times the task should retry when it fails to run. This will