From 3e18fede761cebd3b345963c581e4cbd926bb1ab Mon Sep 17 00:00:00 2001 From: Ying Mao Date: Wed, 8 Sep 2021 15:37:07 -0400 Subject: [PATCH 1/4] Adding abortable es client factory --- .../create_abortable_es_client_factory.ts | 50 +++++++++++++++++++ .../server/task_runner/task_runner.ts | 13 +++++ x-pack/plugins/alerting/server/types.ts | 7 +++ .../server/alert_types/es_query/alert_type.ts | 7 ++- 4 files changed, 73 insertions(+), 4 deletions(-) create mode 100644 x-pack/plugins/alerting/server/alert_instance/create_abortable_es_client_factory.ts diff --git a/x-pack/plugins/alerting/server/alert_instance/create_abortable_es_client_factory.ts b/x-pack/plugins/alerting/server/alert_instance/create_abortable_es_client_factory.ts new file mode 100644 index 0000000000000..a67f537b3a102 --- /dev/null +++ b/x-pack/plugins/alerting/server/alert_instance/create_abortable_es_client_factory.ts @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { TransportRequestPromise } from '@elastic/elasticsearch/lib/Transport'; +import { BehaviorSubject } from 'rxjs'; +import { filter } from 'rxjs/operators'; +import { IScopedClusterClient, Logger } from 'src/core/server'; +import type { ESSearchRequest } from 'src/core/types/elasticsearch'; + +export interface CreateAbortableEsClientFactoryOpts { + scopedClusterClient: IScopedClusterClient; + cancelled$: BehaviorSubject; + logger: Logger; +} + +export function createAbortableEsClientFactory(opts: CreateAbortableEsClientFactoryOpts) { + const { scopedClusterClient, cancelled$, logger } = opts; + return (query: ESSearchRequest, asInternalUser: boolean = true) => { + const esClient = asInternalUser + ? scopedClusterClient.asInternalUser + : scopedClusterClient.asCurrentUser; + + return cancelEsRequestOnAbort(esClient.search(query), cancelled$, logger); + }; +} + +export function cancelEsRequestOnAbort>( + promise: T, + cancelled$: BehaviorSubject, + logger: Logger +) { + const subscription = cancelled$.pipe(filter((cancelled: boolean) => cancelled)).subscribe(() => { + logger.info(`Cancelling es search!!!!!`); + promise.abort(); + }); + + // using .catch() here means unsubscribe will be called + // after it has thrown an error, so we use .then(onSuccess, onFailure) + // syntax + promise.then( + () => subscription.unsubscribe(), + () => subscription.unsubscribe() + ); + + return promise; +} diff --git a/x-pack/plugins/alerting/server/task_runner/task_runner.ts b/x-pack/plugins/alerting/server/task_runner/task_runner.ts index 6694146c3e2b9..05d67776ca1f7 100644 --- a/x-pack/plugins/alerting/server/task_runner/task_runner.ts +++ b/x-pack/plugins/alerting/server/task_runner/task_runner.ts @@ -8,6 +8,7 @@ import type { PublicMethodsOf } from '@kbn/utility-types'; import { Dictionary, pickBy, mapValues, without, cloneDeep } from 'lodash'; import type { Request } from '@hapi/hapi'; +import { BehaviorSubject } from 'rxjs'; import { addSpaceIdToPath } from '../../../spaces/server'; import { Logger, KibanaRequest } from '../../../../../src/core/server'; import { TaskRunnerContext } from './task_runner_factory'; @@ -51,6 +52,7 @@ import { } from '../../common'; import { NormalizedAlertType } from '../rule_type_registry'; import { getEsErrorMessage } from '../lib/errors'; +import { createAbortableEsClientFactory } from '../alert_instance/create_abortable_es_client_factory'; const FALLBACK_RETRY_INTERVAL = '5m'; @@ -90,6 +92,7 @@ export class TaskRunner< RecoveryActionGroupId >; private readonly ruleTypeRegistry: RuleTypeRegistry; + private cancelled$: BehaviorSubject; constructor( alertType: NormalizedAlertType< @@ -109,6 +112,7 @@ export class TaskRunner< this.alertType = alertType; this.taskInstance = taskInstanceToAlertTaskInstance(taskInstance); this.ruleTypeRegistry = context.ruleTypeRegistry; + this.cancelled$ = new BehaviorSubject(false); } async getApiKeyForAlertPermissions(alertId: string, spaceId: string) { @@ -281,6 +285,11 @@ export class TaskRunner< InstanceContext, WithoutReservedActionGroups >(alertInstances), + abortableEsClient: createAbortableEsClientFactory({ + scopedClusterClient: services.scopedClusterClient, + cancelled$: this.cancelled$, + logger: this.logger, + }), }, params, state: alertTypeState as State, @@ -649,6 +658,10 @@ export class TaskRunner< }), }; } + + async cancel(): Promise { + this.cancelled$.next(true); + } } interface TrackAlertDurationsParams< diff --git a/x-pack/plugins/alerting/server/types.ts b/x-pack/plugins/alerting/server/types.ts index ba35890efd781..428de1e196086 100644 --- a/x-pack/plugins/alerting/server/types.ts +++ b/x-pack/plugins/alerting/server/types.ts @@ -7,6 +7,9 @@ import type { IRouter, RequestHandlerContext, SavedObjectReference } from 'src/core/server'; import type { PublicMethodsOf } from '@kbn/utility-types'; +import { ApiResponse, TransportRequestPromise } from '@elastic/elasticsearch/lib/Transport'; +import { SearchResponse } from '@elastic/elasticsearch/api/types'; +import { ESSearchRequest } from 'src/core/types/elasticsearch'; import { PublicAlertInstance } from './alert_instance'; import { RuleTypeRegistry as OrigruleTypeRegistry } from './rule_type_registry'; import { PluginSetupContract, PluginStartContract } from './plugin'; @@ -75,6 +78,10 @@ export interface AlertServices< alertInstanceFactory: ( id: string ) => PublicAlertInstance; + abortableEsClient: ( + query: ESSearchRequest, + asInternalUser: boolean + ) => TransportRequestPromise, unknown>>; } export interface AlertExecutorOptions< diff --git a/x-pack/plugins/stack_alerts/server/alert_types/es_query/alert_type.ts b/x-pack/plugins/stack_alerts/server/alert_types/es_query/alert_type.ts index 5e2e4699c25ca..1635582d3e933 100644 --- a/x-pack/plugins/stack_alerts/server/alert_types/es_query/alert_type.ts +++ b/x-pack/plugins/stack_alerts/server/alert_types/es_query/alert_type.ts @@ -164,7 +164,6 @@ export function getAlertType( const { alertId, name, services, params, state } = options; const previousTimestamp = state.latestTimestamp; - const esClient = services.scopedClusterClient.asCurrentUser; const { parsedQuery, dateStart, dateEnd } = getSearchParams(params); const compareFn = ComparatorFns.get(params.thresholdComparator); @@ -225,9 +224,9 @@ export function getAlertType( logger.debug(`alert ${ES_QUERY_ID}:${alertId} "${name}" query - ${JSON.stringify(query)}`); - const { body: searchResult } = await esClient.search(query); + const { body: searchResult } = await services.abortableEsClient(query, false); - logger.debug( + logger.info( `alert ${ES_QUERY_ID}:${alertId} "${name}" result - ${JSON.stringify(searchResult)}` ); @@ -256,7 +255,7 @@ export function getAlertType( }; const actionContext = addMessages(options, baseContext, params); - const alertInstance = options.services.alertInstanceFactory(ConditionMetAlertInstanceId); + const alertInstance = services.alertInstanceFactory(ConditionMetAlertInstanceId); alertInstance // store the params we would need to recreate the query that led to this alert instance .replaceState({ latestTimestamp: timestamp, dateStart, dateEnd }) From fbc40bc6b00fb4b94286f3b18d1b02904c94f7ec Mon Sep 17 00:00:00 2001 From: Ying Mao Date: Thu, 9 Sep 2021 10:18:06 -0400 Subject: [PATCH 2/4] Writing event log entry when task is cancelled --- x-pack/plugins/alerting/server/plugin.ts | 1 + .../server/task_runner/task_runner.ts | 36 +++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/x-pack/plugins/alerting/server/plugin.ts b/x-pack/plugins/alerting/server/plugin.ts index bb42beba6e237..849faba958d2f 100644 --- a/x-pack/plugins/alerting/server/plugin.ts +++ b/x-pack/plugins/alerting/server/plugin.ts @@ -79,6 +79,7 @@ export const EVENT_LOG_ACTIONS = { newInstance: 'new-instance', recoveredInstance: 'recovered-instance', activeInstance: 'active-instance', + cancel: 'cancelled', }; export const LEGACY_EVENT_LOG_ACTIONS = { resolvedInstance: 'resolved-instance', diff --git a/x-pack/plugins/alerting/server/task_runner/task_runner.ts b/x-pack/plugins/alerting/server/task_runner/task_runner.ts index 05d67776ca1f7..6b2afdda5a26c 100644 --- a/x-pack/plugins/alerting/server/task_runner/task_runner.ts +++ b/x-pack/plugins/alerting/server/task_runner/task_runner.ts @@ -660,7 +660,43 @@ export class TaskRunner< } async cancel(): Promise { + // Received cancel signal from Task Manager this.cancelled$.next(true); + + // Write event log entry + const { + params: { alertId, spaceId }, + } = this.taskInstance; + const namespace = this.context.spaceIdToNamespace(spaceId); + + const eventLogger = this.context.eventLogger; + const event: IEvent = { + '@timestamp': new Date().toISOString(), + event: { + action: EVENT_LOG_ACTIONS.cancel, + kind: 'alert', + category: [this.alertType.producer], + }, + message: `rule execution cancelled by task manager: "${alertId}"`, + kibana: { + saved_objects: [ + { + rel: SAVED_OBJECT_REL_PRIMARY, + type: 'alert', + id: alertId, + type_id: this.alertType.id, + namespace, + }, + ], + }, + rule: { + id: alertId, + license: this.alertType.minimumLicenseRequired, + category: this.alertType.id, + ruleset: this.alertType.producer, + }, + }; + eventLogger.logEvent(event); } } From 38dd10c6c07331e489b3b59d346052e2b0de6d18 Mon Sep 17 00:00:00 2001 From: Ying Mao Date: Fri, 10 Sep 2021 10:18:46 -0400 Subject: [PATCH 3/4] Passing service into rule executor so rules can determine if cancel signal has been sent --- .../alerting/server/task_runner/task_runner.ts | 7 +++++++ x-pack/plugins/alerting/server/types.ts | 1 + .../server/alert_types/es_query/alert_type.ts | 11 +++++++++++ x-pack/plugins/task_manager/server/task.ts | 2 +- 4 files changed, 20 insertions(+), 1 deletion(-) diff --git a/x-pack/plugins/alerting/server/task_runner/task_runner.ts b/x-pack/plugins/alerting/server/task_runner/task_runner.ts index 6b2afdda5a26c..54fb06a90443d 100644 --- a/x-pack/plugins/alerting/server/task_runner/task_runner.ts +++ b/x-pack/plugins/alerting/server/task_runner/task_runner.ts @@ -290,6 +290,7 @@ export class TaskRunner< cancelled$: this.cancelled$, logger: this.logger, }), + executionIsCancelled: createExecutionCancelledService(this.cancelled$), }, params, state: alertTypeState as State, @@ -1016,3 +1017,9 @@ async function errorAsAlertTaskRunResult( }; } } + +function createExecutionCancelledService(cancelled$: BehaviorSubject) { + return () => { + return cancelled$.getValue(); + }; +} diff --git a/x-pack/plugins/alerting/server/types.ts b/x-pack/plugins/alerting/server/types.ts index 428de1e196086..71dde19f0e1e2 100644 --- a/x-pack/plugins/alerting/server/types.ts +++ b/x-pack/plugins/alerting/server/types.ts @@ -82,6 +82,7 @@ export interface AlertServices< query: ESSearchRequest, asInternalUser: boolean ) => TransportRequestPromise, unknown>>; + executionIsCancelled: () => boolean; } export interface AlertExecutorOptions< diff --git a/x-pack/plugins/stack_alerts/server/alert_types/es_query/alert_type.ts b/x-pack/plugins/stack_alerts/server/alert_types/es_query/alert_type.ts index 1635582d3e933..cbbc18626ff61 100644 --- a/x-pack/plugins/stack_alerts/server/alert_types/es_query/alert_type.ts +++ b/x-pack/plugins/stack_alerts/server/alert_types/es_query/alert_type.ts @@ -226,6 +226,17 @@ export function getAlertType( const { body: searchResult } = await services.abortableEsClient(query, false); + // artificially set long delay for testing + await new Promise((done) => setTimeout(() => done(), 120000)); + + // check if execution is cancelled + if (services.executionIsCancelled()) { + logger.warn(`cancel signal received! return early from rule executor`); + return; + } else { + logger.info('execution is not cancelled!'); + } + logger.info( `alert ${ES_QUERY_ID}:${alertId} "${name}" result - ${JSON.stringify(searchResult)}` ); diff --git a/x-pack/plugins/task_manager/server/task.ts b/x-pack/plugins/task_manager/server/task.ts index 2452e3e6f4920..7dcd6562b941d 100644 --- a/x-pack/plugins/task_manager/server/task.ts +++ b/x-pack/plugins/task_manager/server/task.ts @@ -116,7 +116,7 @@ export const taskDefinitionSchema = schema.object( * the task will be re-attempted. */ timeout: schema.string({ - defaultValue: '5m', + defaultValue: '1m', // SETTING THIS LOW FOR TESTING!!! }), /** * Up to how many times the task should retry when it fails to run. This will From a50407b4832ae7ab7377d3ea36e6a7867ec87cbb Mon Sep 17 00:00:00 2001 From: Ying Mao Date: Fri, 10 Sep 2021 10:24:05 -0400 Subject: [PATCH 4/4] Skipping scheduling actions if rule execution is cancelled --- .../alerting/server/task_runner/task_runner.ts | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/x-pack/plugins/alerting/server/task_runner/task_runner.ts b/x-pack/plugins/alerting/server/task_runner/task_runner.ts index 54fb06a90443d..c512368e3604d 100644 --- a/x-pack/plugins/alerting/server/task_runner/task_runner.ts +++ b/x-pack/plugins/alerting/server/task_runner/task_runner.ts @@ -375,7 +375,7 @@ export class TaskRunner< rule: alert, }); - if (!muteAll) { + if (!muteAll && !this.cancelled$.getValue()) { const mutedInstanceIdsSet = new Set(mutedInstanceIds); scheduleActionsForRecoveredInstances({ @@ -429,7 +429,14 @@ export class TaskRunner< ) ); } else { - this.logger.debug(`no scheduling of actions for alert ${alertLabel}: alert is muted.`); + if (muteAll) { + this.logger.debug(`no scheduling of actions for alert ${alertLabel}: alert is muted.`); + } + if (this.cancelled$.getValue()) { + this.logger.debug( + `no scheduling of actions for alert ${alertLabel}: alert execution has been cancelled.` + ); + } } return {