Skip to content

Commit

Permalink
[RAC][Rule Registry] Paginate results for fetching existing alerts (e…
Browse files Browse the repository at this point in the history
…lastic#122474) (elastic#126516)

* [RAC][Rule Registry] Paginate results for fetching existing alerts

* Change to difference to increase performance by 2 seconds for 50K alerts

* Changing the pagination to break up the request into 10K chunks

* Updating NOTICE.txt per CI instructions

* Changing warning message to debug

* Prefix log message with [Rule Registry]

(cherry picked from commit 0236c8a)

# Conflicts:
#	x-pack/plugins/rule_registry/server/utils/create_lifecycle_executor.ts
  • Loading branch information
simianhacker authored Feb 28, 2022
1 parent 599f75a commit e2bd785
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type { PublicContract } from '@kbn/utility-types';
import { getOrElse } from 'fp-ts/lib/Either';
import * as rt from 'io-ts';
import { v4 } from 'uuid';
import { difference } from 'lodash';
import {
AlertExecutorOptions,
AlertInstance,
Expand All @@ -24,7 +25,6 @@ import {
ALERT_DURATION,
ALERT_END,
ALERT_INSTANCE_ID,
ALERT_RULE_UUID,
ALERT_START,
ALERT_STATUS,
ALERT_STATUS_ACTIVE,
Expand All @@ -39,6 +39,7 @@ import {
} from '../../common/technical_rule_data_field_names';
import { IRuleDataClient } from '../rule_data_client';
import { AlertExecutorOptionsWithExtraServices } from '../types';
import { fetchExistingAlerts } from './fetch_existing_alerts';
import {
CommonAlertFieldName,
CommonAlertIdFieldName,
Expand Down Expand Up @@ -179,13 +180,13 @@ export const createLifecycleExecutor =

const currentAlertIds = Object.keys(currentAlerts);
const trackedAlertIds = Object.keys(state.trackedAlerts);
const newAlertIds = currentAlertIds.filter((alertId) => !trackedAlertIds.includes(alertId));
const newAlertIds = difference(currentAlertIds, trackedAlertIds);
const allAlertIds = [...new Set(currentAlertIds.concat(trackedAlertIds))];

const trackedAlertStates = Object.values(state.trackedAlerts);

logger.debug(
`Tracking ${allAlertIds.length} alerts (${newAlertIds.length} new, ${trackedAlertStates.length} previous)`
`[Rule Registry] Tracking ${allAlertIds.length} alerts (${newAlertIds.length} new, ${trackedAlertStates.length} previous)`
);

const trackedAlertsDataMap: Record<
Expand All @@ -194,40 +195,14 @@ export const createLifecycleExecutor =
> = {};

if (trackedAlertStates.length) {
const { hits } = await ruleDataClient.getReader().search({
body: {
query: {
bool: {
filter: [
{
term: {
[ALERT_RULE_UUID]: commonRuleFields[ALERT_RULE_UUID],
},
},
{
terms: {
[ALERT_UUID]: trackedAlertStates.map(
(trackedAlertState) => trackedAlertState.alertUuid
),
},
},
],
},
},
size: trackedAlertStates.length,
collapse: {
field: ALERT_UUID,
},
sort: {
[TIMESTAMP]: 'desc' as const,
},
},
allow_no_indices: true,
});

hits.hits.forEach((hit) => {
const alertId = hit._source[ALERT_INSTANCE_ID];
if (alertId) {
const result = await fetchExistingAlerts(
ruleDataClient,
trackedAlertStates,
commonRuleFields
);
result.forEach((hit) => {
const alertId = hit._source ? hit._source[ALERT_INSTANCE_ID] : void 0;
if (alertId && hit._source) {
trackedAlertsDataMap[alertId] = {
indexName: hit._index,
fields: hit._source,
Expand All @@ -242,7 +217,7 @@ export const createLifecycleExecutor =
const currentAlertData = currentAlerts[alertId];

if (!alertData) {
logger.warn(`Could not find alert data for ${alertId}`);
logger.debug(`[Rule Registry] Could not find alert data for ${alertId}`);
}

const isNew = !state.trackedAlerts[alertId];
Expand Down Expand Up @@ -283,7 +258,7 @@ export const createLifecycleExecutor =
const allEventsToIndex = [...trackedEventsToIndex, ...newEventsToIndex];

if (allEventsToIndex.length > 0 && ruleDataClient.isWriteEnabled()) {
logger.debug(`Preparing to index ${allEventsToIndex.length} alerts.`);
logger.debug(`[Rule Registry] Preparing to index ${allEventsToIndex.length} alerts.`);

await ruleDataClient.getWriter().bulk({
body: allEventsToIndex.flatMap(({ event, indexName }) => [
Expand Down
74 changes: 74 additions & 0 deletions x-pack/plugins/rule_registry/server/utils/fetch_existing_alerts.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { chunk } from 'lodash';
import { PublicContract } from '@kbn/utility-types';
import { IRuleDataClient } from '../rule_data_client';
import {
ALERT_RULE_UUID,
ALERT_UUID,
TIMESTAMP,
} from '../../common/technical_rule_data_field_names';

const CHUNK_SIZE = 10000;

interface TrackedAlertState {
alertId: string;
alertUuid: string;
started: string;
}
type RuleDataClient = PublicContract<IRuleDataClient>;

const fetchAlertsForStates = async (
ruleDataClient: RuleDataClient,
states: TrackedAlertState[],
commonRuleFields: any
) => {
const request = {
body: {
query: {
bool: {
filter: [
{
term: {
[ALERT_RULE_UUID]: commonRuleFields[ALERT_RULE_UUID],
},
},
{
terms: {
[ALERT_UUID]: states.map((state) => state.alertUuid),
},
},
],
},
},
size: states.length,
collapse: {
field: ALERT_UUID,
},
sort: {
[TIMESTAMP]: 'desc' as const,
},
},
allow_no_indices: true,
} as any;
const { hits } = await ruleDataClient.getReader().search(request);
return hits.hits;
};

export const fetchExistingAlerts = async (
ruleDataClient: RuleDataClient,
trackedAlertStates: TrackedAlertState[],
commonRuleFields: any
) => {
const results = await Promise.all(
chunk(trackedAlertStates, CHUNK_SIZE).map((states) =>
fetchAlertsForStates(ruleDataClient, states, commonRuleFields)
)
);
return results.flat();
};

0 comments on commit e2bd785

Please sign in to comment.