From e2bd785da8d0afecf96cdfd94b73cfc8c076f1eb Mon Sep 17 00:00:00 2001 From: Chris Cowan Date: Mon, 28 Feb 2022 14:19:34 -0700 Subject: [PATCH] [RAC][Rule Registry] Paginate results for fetching existing alerts (#122474) (#126516) * [RAC][Rule Registry] Paginate results for fetching existing alerts * Change to difference to increase performance by 2 seconds for 50K alerts * Changing the pagination to break up the request into 10K chunks * Updating NOTICE.txt per CI instructions * Changing warning message to debug * Prefix log message with [Rule Registry] (cherry picked from commit 0236c8ae42051d569654857f67832e78d6da5391) # Conflicts: # x-pack/plugins/rule_registry/server/utils/create_lifecycle_executor.ts --- .../server/utils/create_lifecycle_executor.ts | 53 ++++--------- .../server/utils/fetch_existing_alerts.ts | 74 +++++++++++++++++++ 2 files changed, 88 insertions(+), 39 deletions(-) create mode 100644 x-pack/plugins/rule_registry/server/utils/fetch_existing_alerts.ts diff --git a/x-pack/plugins/rule_registry/server/utils/create_lifecycle_executor.ts b/x-pack/plugins/rule_registry/server/utils/create_lifecycle_executor.ts index 77a7d80613e6..626d90ec8520 100644 --- a/x-pack/plugins/rule_registry/server/utils/create_lifecycle_executor.ts +++ b/x-pack/plugins/rule_registry/server/utils/create_lifecycle_executor.ts @@ -10,6 +10,7 @@ import type { PublicContract } from '@kbn/utility-types'; import { getOrElse } from 'fp-ts/lib/Either'; import * as rt from 'io-ts'; import { v4 } from 'uuid'; +import { difference } from 'lodash'; import { AlertExecutorOptions, AlertInstance, @@ -24,7 +25,6 @@ import { ALERT_DURATION, ALERT_END, ALERT_INSTANCE_ID, - ALERT_RULE_UUID, ALERT_START, ALERT_STATUS, ALERT_STATUS_ACTIVE, @@ -39,6 +39,7 @@ import { } from '../../common/technical_rule_data_field_names'; import { IRuleDataClient } from '../rule_data_client'; import { AlertExecutorOptionsWithExtraServices } from '../types'; +import { fetchExistingAlerts } from './fetch_existing_alerts'; import { CommonAlertFieldName, CommonAlertIdFieldName, @@ -179,13 +180,13 @@ export const createLifecycleExecutor = const currentAlertIds = Object.keys(currentAlerts); const trackedAlertIds = Object.keys(state.trackedAlerts); - const newAlertIds = currentAlertIds.filter((alertId) => !trackedAlertIds.includes(alertId)); + const newAlertIds = difference(currentAlertIds, trackedAlertIds); const allAlertIds = [...new Set(currentAlertIds.concat(trackedAlertIds))]; const trackedAlertStates = Object.values(state.trackedAlerts); logger.debug( - `Tracking ${allAlertIds.length} alerts (${newAlertIds.length} new, ${trackedAlertStates.length} previous)` + `[Rule Registry] Tracking ${allAlertIds.length} alerts (${newAlertIds.length} new, ${trackedAlertStates.length} previous)` ); const trackedAlertsDataMap: Record< @@ -194,40 +195,14 @@ export const createLifecycleExecutor = > = {}; if (trackedAlertStates.length) { - const { hits } = await ruleDataClient.getReader().search({ - body: { - query: { - bool: { - filter: [ - { - term: { - [ALERT_RULE_UUID]: commonRuleFields[ALERT_RULE_UUID], - }, - }, - { - terms: { - [ALERT_UUID]: trackedAlertStates.map( - (trackedAlertState) => trackedAlertState.alertUuid - ), - }, - }, - ], - }, - }, - size: trackedAlertStates.length, - collapse: { - field: ALERT_UUID, - }, - sort: { - [TIMESTAMP]: 'desc' as const, - }, - }, - allow_no_indices: true, - }); - - hits.hits.forEach((hit) => { - const alertId = hit._source[ALERT_INSTANCE_ID]; - if (alertId) { + const result = await fetchExistingAlerts( + ruleDataClient, + trackedAlertStates, + commonRuleFields + ); + result.forEach((hit) => { + const alertId = hit._source ? hit._source[ALERT_INSTANCE_ID] : void 0; + if (alertId && hit._source) { trackedAlertsDataMap[alertId] = { indexName: hit._index, fields: hit._source, @@ -242,7 +217,7 @@ export const createLifecycleExecutor = const currentAlertData = currentAlerts[alertId]; if (!alertData) { - logger.warn(`Could not find alert data for ${alertId}`); + logger.debug(`[Rule Registry] Could not find alert data for ${alertId}`); } const isNew = !state.trackedAlerts[alertId]; @@ -283,7 +258,7 @@ export const createLifecycleExecutor = const allEventsToIndex = [...trackedEventsToIndex, ...newEventsToIndex]; if (allEventsToIndex.length > 0 && ruleDataClient.isWriteEnabled()) { - logger.debug(`Preparing to index ${allEventsToIndex.length} alerts.`); + logger.debug(`[Rule Registry] Preparing to index ${allEventsToIndex.length} alerts.`); await ruleDataClient.getWriter().bulk({ body: allEventsToIndex.flatMap(({ event, indexName }) => [ diff --git a/x-pack/plugins/rule_registry/server/utils/fetch_existing_alerts.ts b/x-pack/plugins/rule_registry/server/utils/fetch_existing_alerts.ts new file mode 100644 index 000000000000..892e237f8e30 --- /dev/null +++ b/x-pack/plugins/rule_registry/server/utils/fetch_existing_alerts.ts @@ -0,0 +1,74 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { chunk } from 'lodash'; +import { PublicContract } from '@kbn/utility-types'; +import { IRuleDataClient } from '../rule_data_client'; +import { + ALERT_RULE_UUID, + ALERT_UUID, + TIMESTAMP, +} from '../../common/technical_rule_data_field_names'; + +const CHUNK_SIZE = 10000; + +interface TrackedAlertState { + alertId: string; + alertUuid: string; + started: string; +} +type RuleDataClient = PublicContract; + +const fetchAlertsForStates = async ( + ruleDataClient: RuleDataClient, + states: TrackedAlertState[], + commonRuleFields: any +) => { + const request = { + body: { + query: { + bool: { + filter: [ + { + term: { + [ALERT_RULE_UUID]: commonRuleFields[ALERT_RULE_UUID], + }, + }, + { + terms: { + [ALERT_UUID]: states.map((state) => state.alertUuid), + }, + }, + ], + }, + }, + size: states.length, + collapse: { + field: ALERT_UUID, + }, + sort: { + [TIMESTAMP]: 'desc' as const, + }, + }, + allow_no_indices: true, + } as any; + const { hits } = await ruleDataClient.getReader().search(request); + return hits.hits; +}; + +export const fetchExistingAlerts = async ( + ruleDataClient: RuleDataClient, + trackedAlertStates: TrackedAlertState[], + commonRuleFields: any +) => { + const results = await Promise.all( + chunk(trackedAlertStates, CHUNK_SIZE).map((states) => + fetchAlertsForStates(ruleDataClient, states, commonRuleFields) + ) + ); + return results.flat(); +};