Skip to content

Commit

Permalink
[Metrics UI] Increase composite size to 10K for Metric Threshold Rule…
Browse files Browse the repository at this point in the history
… and optimize processing (elastic#121904) (elastic#126506)

* [Metrics UI] Increase composite size for Metric Threshold Rule to 10K

* Adding performance optimizations

* Fixing metrics_alerting integration test

* fixing tests

* Fixing integration test and config mock

* Removing the setTimeout code to simplify to a for/of

* Adding new setting to docs

* Adding metric_threshold identifier to the config setting

(cherry picked from commit ae0c8d5)

# Conflicts:
#	x-pack/plugins/infra/server/lib/alerting/metric_threshold/lib/evaluate_rule.ts
  • Loading branch information
simianhacker authored Feb 28, 2022
1 parent 05c0327 commit 599f75a
Show file tree
Hide file tree
Showing 12 changed files with 133 additions and 53 deletions.
5 changes: 4 additions & 1 deletion docs/settings/general-infra-logs-ui-settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,7 @@ Field used to identify hosts. Defaults to `host.name`.
Field used to identify Docker containers. Defaults to `container.id`.

`xpack.infra.sources.default.fields.pod`::
Field used to identify Kubernetes pods. Defaults to `kubernetes.pod.uid`.
Field used to identify Kubernetes pods. Defaults to `kubernetes.pod.uid`.

`xpack.infra.alerting.metric_threshold.group_by_page_size`::
Controls the size of the composite aggregations used by the Metric Threshold group by feature. Defaults to `10000`.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
*/

import { ElasticsearchClient } from 'kibana/server';
import { first, has, isNaN, isNumber, isObject, last, mapValues } from 'lodash';
import moment from 'moment';
import { difference, mapValues, first, last, isNaN, isNumber, isObject, has } from 'lodash';
import {
Aggregators,
Comparator,
Expand Down Expand Up @@ -68,6 +68,7 @@ export const evaluateRule = <Params extends EvaluatedRuleParams = EvaluatedRuleP
params: Params,
config: InfraSource['configuration'],
prevGroups: string[],
compositeSize: number,
timeframe?: { start?: number; end: number }
) => {
const { criteria, groupBy, filterQuery, shouldDropPartialBuckets } = params;
Expand All @@ -80,6 +81,7 @@ export const evaluateRule = <Params extends EvaluatedRuleParams = EvaluatedRuleP
config.metricAlias,
groupBy,
filterQuery,
compositeSize,
timeframe,
shouldDropPartialBuckets
);
Expand All @@ -98,27 +100,22 @@ export const evaluateRule = <Params extends EvaluatedRuleParams = EvaluatedRuleP
// If any previous groups are no longer being reported, backfill them with null values
const currentGroups = Object.keys(currentValues);

const missingGroups = prevGroups.filter((g) => !currentGroups.includes(g));
const missingGroups = difference(prevGroups, currentGroups);

if (currentGroups.length === 0 && missingGroups.length === 0) {
missingGroups.push(UNGROUPED_FACTORY_KEY);
}
const backfillTimestamp =
last(last(Object.values(currentValues)))?.key ?? new Date().toISOString();
const backfilledPrevGroups: Record<
string,
Array<{ key: string; value: number }>
> = missingGroups.reduce(
(result, group) => ({
...result,
[group]: [
{
key: backfillTimestamp,
value: criterion.aggType === Aggregators.COUNT ? 0 : null,
},
],
}),
{}
);
const backfilledPrevGroups: Record<string, Array<{ key: string; value: number | null }>> = {};
for (const group of missingGroups) {
backfilledPrevGroups[group] = [
{
key: backfillTimestamp,
value: criterion.aggType === Aggregators.COUNT ? 0 : null,
},
];
}
const currentValuesWithBackfilledPrevGroups = {
...currentValues,
...backfilledPrevGroups,
Expand Down Expand Up @@ -152,6 +149,7 @@ const getMetric: (
index: string,
groupBy: string | undefined | string[],
filterQuery: string | undefined,
compositeSize: number,
timeframe?: { start?: number; end: number },
shouldDropPartialBuckets?: boolean
) => Promise<Record<string, Array<{ key: string; value: number }>>> = async function (
Expand All @@ -160,6 +158,7 @@ const getMetric: (
index,
groupBy,
filterQuery,
compositeSize,
timeframe,
shouldDropPartialBuckets
) {
Expand All @@ -174,6 +173,7 @@ const getMetric: (
const searchBody = getElasticsearchMetricQuery(
params,
calculatedTimerange,
compositeSize,
hasGroupBy ? groupBy : undefined,
filterQuery
);
Expand Down Expand Up @@ -204,21 +204,18 @@ const getMetric: (
bucketSelector,
afterKeyHandler
)) as Array<Aggregation & { key: Record<string, string>; doc_count: number }>;
const groupedResults = compositeBuckets.reduce(
(result, bucket) => ({
...result,
[Object.values(bucket.key)
.map((value) => value)
.join(', ')]: getValuesFromAggregations(
bucket,
aggType,
dropPartialBucketsOptions,
calculatedTimerange,
bucket.doc_count
),
}),
{}
);
const groupedResults: Record<string, any> = {};
for (const bucket of compositeBuckets) {
const key = Object.values(bucket.key).join(', ');
const value = getValuesFromAggregations(
bucket,
aggType,
dropPartialBucketsOptions,
calculatedTimerange,
bucket.doc_count
);
groupedResults[key] = value;
}
return groupedResults;
}
const { body: result } = await esClient.search({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ describe("The Metric Threshold Alert's getElasticsearchMetricQuery", () => {
};

describe('when passed no filterQuery', () => {
const searchBody = getElasticsearchMetricQuery(expressionParams, timeframe, groupBy);
const searchBody = getElasticsearchMetricQuery(expressionParams, timeframe, 100, groupBy);
test('includes a range filter', () => {
expect(
searchBody.query.bool.filter.find((filter) => filter.hasOwnProperty('range'))
Expand All @@ -47,6 +47,7 @@ describe("The Metric Threshold Alert's getElasticsearchMetricQuery", () => {
const searchBody = getElasticsearchMetricQuery(
expressionParams,
timeframe,
100,
groupBy,
filterQuery
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import { networkTraffic } from '../../../../../common/inventory_models/shared/me
import { calculateDateHistogramOffset } from '../../../metrics/lib/calculate_date_histogram_offset';
import { createPercentileAggregation } from './create_percentile_aggregation';

const COMPOSITE_RESULTS_PER_PAGE = 100;

const getParsedFilterQuery: (filterQuery: string | undefined) => Record<string, any> | null = (
filterQuery
) => {
Expand All @@ -23,6 +21,7 @@ const getParsedFilterQuery: (filterQuery: string | undefined) => Record<string,
export const getElasticsearchMetricQuery = (
{ metric, aggType, timeUnit, timeSize }: MetricExpressionParams,
timeframe: { start: number; end: number },
compositeSize: number,
groupBy?: string | string[],
filterQuery?: string
) => {
Expand Down Expand Up @@ -73,7 +72,7 @@ export const getElasticsearchMetricQuery = (
? {
groupings: {
composite: {
size: COMPOSITE_RESULTS_PER_PAGE,
size: compositeSize,
sources: Array.isArray(groupBy)
? groupBy.map((field, index) => ({
[`groupBy${index}`]: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,11 @@ describe('The metric threshold alert type', () => {
});

const createMockStaticConfiguration = (sources: any) => ({
alerting: {
metric_threshold: {
group_by_page_size: 100,
},
},
inventory: {
compositeSize: 2000,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ export const createMetricThresholdExecutor = (libs: InfraBackendLibs) =>
sourceId || 'default'
);
const config = source.configuration;
const compositeSize = libs.configuration.alerting.metric_threshold.group_by_page_size;

const previousGroupBy = state.groupBy;
const previousFilterQuery = state.filterQuery;
Expand All @@ -135,7 +136,8 @@ export const createMetricThresholdExecutor = (libs: InfraBackendLibs) =>
services.scopedClusterClient.asCurrentUser,
params as EvaluatedRuleParams,
config,
prevGroups
prevGroups,
compositeSize
);

// Because each alert result has the same group definitions, just grab the groups from the first one.
Expand Down Expand Up @@ -248,7 +250,6 @@ export const createMetricThresholdExecutor = (libs: InfraBackendLibs) =>
});
}
}

return { groups, groupBy: params.groupBy, filterQuery: params.filterQuery };
});

Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/infra/server/lib/infra_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/

import { handleEsError } from '../../../../../src/plugins/es_ui_shared/server';
import { InfraConfig } from '../plugin';
import { InfraConfig } from '../types';
import { GetLogQueryFields } from '../services/log_queries/get_log_query_fields';
import { RulesServiceSetup } from '../services/rules';
import { KibanaFramework } from './adapters/framework/kibana_framework_adapter';
Expand Down
5 changes: 5 additions & 0 deletions x-pack/plugins/infra/server/lib/sources/sources.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ describe('the InfraSources lib', () => {
});

const createMockStaticConfiguration = (sources: any) => ({
alerting: {
metric_threshold: {
group_by_page_size: 10000,
},
},
enabled: true,
inventory: {
compositeSize: 2000,
Expand Down
13 changes: 9 additions & 4 deletions x-pack/plugins/infra/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/

import { Server } from '@hapi/hapi';
import { schema, TypeOf } from '@kbn/config-schema';
import { schema } from '@kbn/config-schema';
import { i18n } from '@kbn/i18n';
import { Logger } from '@kbn/logging';
import {
Expand Down Expand Up @@ -35,15 +35,20 @@ import { InfraBackendLibs, InfraDomainLibs } from './lib/infra_types';
import { infraSourceConfigurationSavedObjectType, InfraSources } from './lib/sources';
import { InfraSourceStatus } from './lib/source_status';
import { LogEntriesService } from './services/log_entries';
import { InfraPluginRequestHandlerContext } from './types';
import { InfraPluginRequestHandlerContext, InfraConfig } from './types';
import { UsageCollector } from './usage/usage_collector';
import { createGetLogQueryFields } from './services/log_queries/get_log_query_fields';
import { handleEsError } from '../../../../src/plugins/es_ui_shared/server';
import { RulesService } from './services/rules';
import { configDeprecations, getInfraDeprecationsFactory } from './deprecations';

export const config: PluginConfigDescriptor = {
export const config: PluginConfigDescriptor<InfraConfig> = {
schema: schema.object({
alerting: schema.object({
metric_threshold: schema.object({
group_by_page_size: schema.number({ defaultValue: 10000 }),
}),
}),
inventory: schema.object({
compositeSize: schema.number({ defaultValue: 2000 }),
}),
Expand All @@ -64,7 +69,7 @@ export const config: PluginConfigDescriptor = {
deprecations: configDeprecations,
};

export type InfraConfig = TypeOf<typeof config.schema>;
export type { InfraConfig };

export interface KbnServer extends Server {
usage: any;
Expand Down
18 changes: 18 additions & 0 deletions x-pack/plugins/infra/server/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,21 @@ export interface InfraPluginRequestHandlerContext extends RequestHandlerContext
infra: InfraRequestHandlerContext;
search: SearchRequestHandlerContext;
}

export interface InfraConfig {
alerting: {
metric_threshold: {
group_by_page_size: number;
};
};
inventory: {
compositeSize: number;
};
sources?: {
default?: {
fields?: {
message?: string[];
};
};
};
}
Loading

0 comments on commit 599f75a

Please sign in to comment.