Skip to content

Commit

Permalink
Fetch rule actions in chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
xcrzx committed Dec 15, 2021
1 parent 442ccad commit c8e9036
Show file tree
Hide file tree
Showing 5 changed files with 325 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { findRules } from '../../rules/find_rules';
import { buildSiemResponse } from '../utils';
import { buildRouteValidation } from '../../../../utils/build_validation/route_validation';
import { transformFindAlerts } from './utils';
import { getCurrentRuleStatuses } from './utils/get_current_rule_statuses';

// eslint-disable-next-line no-restricted-imports
import { legacyGetBulkRuleActionsSavedObject } from '../../rule_actions/legacy_get_bulk_rule_actions_saved_object';
Expand Down Expand Up @@ -66,14 +67,12 @@ export const findRulesRoute = (
filter: query.filter,
fields: query.fields,
});
const alertIds = rules.data.map((rule) => rule.id);
const ruleIds = rules.data.map((rule) => rule.id);

const spaceId = context.securitySolution.getSpaceId();
const [currentStatusesByRuleId, ruleActions] = await Promise.all([
execLogClient.getCurrentStatusBulk({
ruleIds: alertIds,
spaceId: context.securitySolution.getSpaceId(),
}),
legacyGetBulkRuleActionsSavedObject({ alertIds, savedObjectsClient, logger }),
getCurrentRuleStatuses({ ruleIds, execLogClient, spaceId, logger }),
legacyGetBulkRuleActionsSavedObject({ alertIds: ruleIds, savedObjectsClient, logger }),
]);
const transformed = transformFindAlerts(rules, currentStatusesByRuleId, ruleActions);
if (transformed == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { chunk } from 'lodash';
import { Logger } from 'src/core/server';
import { initPromisePool } from '../../../../../utils/promise_pool';
import { GetCurrentStatusBulkResult, IRuleExecutionLogClient } from '../../../rule_execution_log';

const RULES_PER_CHUNK = 1000;

interface GetCurrentRuleStatusesArgs {
ruleIds: string[];
execLogClient: IRuleExecutionLogClient;
spaceId: string;
logger: Logger;
}

/**
* Get the most recent execution status for each of the given rule IDs.
* This method splits work into chunks so not to owerwhelm Elasticsearch
* when fetching statuses for a big number of rules.
*
* @param ruleIds Rule IDs to fetch statuses for
* @param execLogClient RuleExecutionLogClient
* @param spaceId Current Space ID
* @param logger Logger
* @returns A dict with rule IDs as keys and rule statuses as values
*
* @throws AggregateError if any of the rule status requests fail
*/
export async function getCurrentRuleStatuses({
ruleIds,
execLogClient,
spaceId,
logger,
}: GetCurrentRuleStatusesArgs): Promise<GetCurrentStatusBulkResult> {
const { results, errors } = await initPromisePool({
concurrency: 1,
items: chunk(ruleIds, RULES_PER_CHUNK),
executor: (ruleIdsChunk) =>
execLogClient
.getCurrentStatusBulk({
ruleIds: ruleIdsChunk,
spaceId,
})
.catch((error) => {
logger.error(
`Error fetching rule status: ${error instanceof Error ? error.message : String(error)}`
);
throw error;
}),
});

if (errors.length) {
throw new AggregateError(errors, 'Error fetching rule statuses');
}

// Merge all rule statuses into a single dict
return Object.assign({}, ...results);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* 2.0.
*/

import { chunk } from 'lodash';
import { SavedObjectsFindOptionsReference } from 'kibana/server';
import { Logger } from 'src/core/server';

Expand All @@ -17,6 +18,7 @@ import { LegacyIRuleActionsAttributesSavedObjectAttributes } from './legacy_type
import { legacyGetRuleActionsFromSavedObject } from './legacy_utils';
// eslint-disable-next-line no-restricted-imports
import { LegacyRulesActionsSavedObject } from './legacy_get_rule_actions_saved_object';
import { initPromisePool } from '../../../utils/promise_pool';

/**
* @deprecated Once we are confident all rules relying on side-car actions SO's have been migrated to SO references we should remove this function
Expand All @@ -39,15 +41,29 @@ export const legacyGetBulkRuleActionsSavedObject = async ({
id: alertId,
type: 'alert',
}));
const {
// eslint-disable-next-line @typescript-eslint/naming-convention
saved_objects,
} = await savedObjectsClient.find<LegacyIRuleActionsAttributesSavedObjectAttributes>({
type: legacyRuleActionsSavedObjectType,
perPage: 10000,
hasReference: references,
const { results, errors } = await initPromisePool({
concurrency: 1,
items: chunk(references, 1000),
executor: (referencesChunk) =>
savedObjectsClient
.find<LegacyIRuleActionsAttributesSavedObjectAttributes>({
type: legacyRuleActionsSavedObjectType,
perPage: 10000,
hasReference: referencesChunk,
})
.catch((error) => {
logger.error(
`Error fetching rule actions: ${error instanceof Error ? error.message : String(error)}`
);
throw error;
}),
});
return saved_objects.reduce(
if (errors.length) {
throw new AggregateError(errors, 'Error fetching rule actions');
}

const savedObjects = results.flatMap((result) => result.saved_objects);
return savedObjects.reduce(
(acc: { [key: string]: LegacyRulesActionsSavedObject }, savedObject) => {
const ruleAlertId = savedObject.references.find((reference) => {
// Find the first rule alert and assume that is the one we want since we should only ever have 1.
Expand Down
174 changes: 174 additions & 0 deletions x-pack/plugins/security_solution/server/utils/promise_pool.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { initPromisePool } from './promise_pool';

const nextTick = () => new Promise((resolve) => setImmediate(resolve));

const initPoolWithTasks = ({ concurrency = 1, items = [1, 2, 3] }) => {
const asyncTasks: Record<
number,
{
status: 'pending' | 'resolved' | 'rejected';
resolve: () => void;
reject: () => void;
}
> = {};

const promisePool = initPromisePool({
concurrency,
items,
executor: async (x) =>
new Promise((resolve, reject) => {
asyncTasks[x] = {
status: 'pending',
resolve: () => {
asyncTasks[x].status = 'resolved';
resolve(x);
},
reject: () => {
asyncTasks[x].status = 'rejected';
reject(new Error(`Error processing ${x}`));
},
};
}),
});

return [promisePool, asyncTasks] as const;
};

describe('initPromisePool', () => {
it('should execute async tasks', async () => {
const { results, errors } = await initPromisePool({
concurrency: 1,
items: [1, 2, 3],
executor: async (x) => x,
});

expect(results).toEqual([1, 2, 3]);
expect(errors).toEqual([]);
});

it('should capture any errors that occur during tasks execution', async () => {
const { results, errors } = await initPromisePool({
concurrency: 1,
items: [1, 2, 3],
executor: async (x) => {
throw new Error(`Error processing ${x}`);
},
});

expect(results).toEqual([]);
expect(errors).toEqual([
new Error(`Error processing 1`),
new Error(`Error processing 2`),
new Error(`Error processing 3`),
]);
});

it('should respect concurrency', async () => {
const [promisePool, asyncTasks] = initPoolWithTasks({
concurrency: 1,
items: [1, 2, 3],
});

// Check that we have only one task pending initially as concurrency = 1
expect(asyncTasks).toEqual({
1: expect.objectContaining({ status: 'pending' }),
});

asyncTasks[1].resolve();
await nextTick();

// Check that after resolving the first task, the second is pending
expect(asyncTasks).toEqual({
1: expect.objectContaining({ status: 'resolved' }),
2: expect.objectContaining({ status: 'pending' }),
});

asyncTasks[2].reject();
await nextTick();

// Check that after rejecting the second task, the third is pending
expect(asyncTasks).toEqual({
1: expect.objectContaining({ status: 'resolved' }),
2: expect.objectContaining({ status: 'rejected' }),
3: expect.objectContaining({ status: 'pending' }),
});

asyncTasks[3].resolve();
await nextTick();

// Check that all taks have been settled
expect(asyncTasks).toEqual({
1: expect.objectContaining({ status: 'resolved' }),
2: expect.objectContaining({ status: 'rejected' }),
3: expect.objectContaining({ status: 'resolved' }),
});

const { results, errors } = await promisePool;

// Check final reesuts
expect(results).toEqual([1, 3]);
expect(errors).toEqual([new Error(`Error processing 2`)]);
});

it('should be possible to configure concurrency', async () => {
const [promisePool, asyncTasks] = initPoolWithTasks({
concurrency: 2,
items: [1, 2, 3, 4, 5],
});

// Check that we have only two tasks pending initially as concurrency = 2
expect(asyncTasks).toEqual({
1: expect.objectContaining({ status: 'pending' }),
2: expect.objectContaining({ status: 'pending' }),
});

asyncTasks[1].resolve();
await nextTick();

// Check that after resolving the first task, the second and the third is pending
expect(asyncTasks).toEqual({
1: expect.objectContaining({ status: 'resolved' }),
2: expect.objectContaining({ status: 'pending' }),
3: expect.objectContaining({ status: 'pending' }),
});

asyncTasks[2].reject();
asyncTasks[3].reject();
await nextTick();

// Check that after rejecting the second and the third tasks, the rest are pending
expect(asyncTasks).toEqual({
1: expect.objectContaining({ status: 'resolved' }),
2: expect.objectContaining({ status: 'rejected' }),
3: expect.objectContaining({ status: 'rejected' }),
4: expect.objectContaining({ status: 'pending' }),
5: expect.objectContaining({ status: 'pending' }),
});

asyncTasks[4].resolve();
asyncTasks[5].resolve();
await nextTick();

// Check that all taks have been settled
expect(asyncTasks).toEqual({
1: expect.objectContaining({ status: 'resolved' }),
2: expect.objectContaining({ status: 'rejected' }),
3: expect.objectContaining({ status: 'rejected' }),
4: expect.objectContaining({ status: 'resolved' }),
5: expect.objectContaining({ status: 'resolved' }),
});

const { results, errors } = await promisePool;

// Check final reesuts
expect(results).toEqual([1, 4, 5]);
expect(errors).toEqual([new Error(`Error processing 2`), new Error(`Error processing 3`)]);
});
});
58 changes: 58 additions & 0 deletions x-pack/plugins/security_solution/server/utils/promise_pool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

interface PromisePoolArgs<Item, Result> {
concurrency?: number;
items: Item[];
executor: (item: Item) => Promise<Result>;
}

/**
* Runs promises in batches. It ensures that the number of running async tasks
* doesn't exceed the concurrency parameter passed to the function.
*
* @param concurrency - number of tasks run in parallel
* @param items - array of items to be passes to async executor
* @param executor - an async function to be called with each provided item
*
* @returns Struct holding results or errors of async tasks
*/
export const initPromisePool = async <Item, Result>({
concurrency = 1,
items,
executor,
}: PromisePoolArgs<Item, Result>) => {
const tasks: Array<Promise<void>> = [];
const results: Result[] = [];
const errors: unknown[] = [];

for (const item of items) {
// Check if the pool is full
if (tasks.length >= concurrency) {
// Wait for any first task to finish
await Promise.race(tasks);
}

const task: Promise<void> = executor(item)
.then((result) => {
results.push(result);
})
.catch(async (error) => {
errors.push(error);
})
.finally(() => {
tasks.splice(tasks.indexOf(task), 1);
});

tasks.push(task);
}

// Wait for all remaining tasks to finish
await Promise.all(tasks);

return { results, errors };
};

0 comments on commit c8e9036

Please sign in to comment.