Skip to content

Commit

Permalink
[ML] Prevent duplicate notifications about the same anomaly result (#…
Browse files Browse the repository at this point in the history
…91485)

* [ML] check kibana even logs for existing alert instance

* [ML] create alert instance key, add check for alert id

* [ML] use anomaly_utils, check interval gap

* [ML] add detector index

* [ML] fix unit test

* [ML] include detector_index into source
  • Loading branch information
darnautov authored Feb 17, 2021
1 parent 06b8fb4 commit c84047b
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 39 deletions.
13 changes: 8 additions & 5 deletions x-pack/plugins/ml/common/types/alerts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export type TopHitsResultsKeys = 'top_record_hits' | 'top_bucket_hits' | 'top_in
export interface AlertExecutionResult {
count: number;
key: number;
key_as_string: string;
alertInstanceKey: string;
isInterim: boolean;
jobIds: string[];
timestamp: number;
Expand Down Expand Up @@ -47,10 +47,13 @@ interface BaseAnomalyAlertDoc {
export interface RecordAnomalyAlertDoc extends BaseAnomalyAlertDoc {
result_type: typeof ANOMALY_RESULT_TYPE.RECORD;
function: string;
field_name: string;
by_field_value: string | number;
over_field_value: string | number;
partition_field_value: string | number;
field_name?: string;
by_field_name?: string;
by_field_value?: string | number;
over_field_name?: string;
over_field_value?: string | number;
partition_field_name?: string;
partition_field_value?: string | number;
}

export interface BucketAnomalyAlertDoc extends BaseAnomalyAlertDoc {
Expand Down
4 changes: 0 additions & 4 deletions x-pack/plugins/ml/common/util/anomaly_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,6 @@ export function getEntityFieldName(record: AnomalyRecordDoc): string | undefined
if (record.partition_field_name !== undefined) {
return record.partition_field_name;
}

return undefined;
}

// Returns the value of the field to use as the entity value from the source record
Expand All @@ -249,8 +247,6 @@ export function getEntityFieldValue(record: AnomalyRecordDoc): string | number |
if (record.partition_field_value !== undefined) {
return record.partition_field_value;
}

return undefined;
}

// Returns the list of partitioning entity fields for the source record as a list
Expand Down
4 changes: 2 additions & 2 deletions x-pack/plugins/ml/server/lib/alerts/alerting_service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
* 2.0.
*/

import { resolveTimeInterval } from './alerting_service';
import { resolveBucketSpanInSeconds } from './alerting_service';

describe('Alerting Service', () => {
test('should resolve maximum bucket interval', () => {
expect(resolveTimeInterval(['15m', '1h', '6h', '90s'])).toBe('43200s');
expect(resolveBucketSpanInSeconds(['15m', '1h', '6h', '90s'])).toBe(43200);
});
});
143 changes: 121 additions & 22 deletions x-pack/plugins/ml/server/lib/alerts/alerting_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

import Boom from '@hapi/boom';
import rison from 'rison-node';
import { ElasticsearchClient } from 'kibana/server';
import moment from 'moment';
import { Duration } from 'moment/moment';
import { MlClient } from '../ml_client';
import {
MlAnomalyDetectionAlertParams,
Expand All @@ -25,6 +28,8 @@ import {
import { parseInterval } from '../../../common/util/parse_interval';
import { AnomalyDetectionAlertContext } from './register_anomaly_detection_alert_type';
import { MlJobsResponse } from '../../../common/types/job_service';
import { ANOMALY_SCORE_MATCH_GROUP_ID } from '../../../common/constants/alerts';
import { getEntityFieldName, getEntityFieldValue } from '../../../common/util/anomaly_utils';

function isDefined<T>(argument: T | undefined | null): argument is T {
return argument !== undefined && argument !== null;
Expand All @@ -34,22 +39,23 @@ function isDefined<T>(argument: T | undefined | null): argument is T {
* Resolves the longest bucket span from the list and multiply it by 2.
* @param bucketSpans Collection of bucket spans
*/
export function resolveTimeInterval(bucketSpans: string[]): string {
return `${
export function resolveBucketSpanInSeconds(bucketSpans: string[]): number {
return (
Math.max(
...bucketSpans
.map((b) => parseInterval(b))
.filter(isDefined)
.map((v) => v.asSeconds())
) * 2
}s`;
);
}

/**
* Alerting related server-side methods
* @param mlClient
* @param esClient
*/
export function alertingServiceProvider(mlClient: MlClient) {
export function alertingServiceProvider(mlClient: MlClient, esClient: ElasticsearchClient) {
const getAggResultsLabel = (resultType: AnomalyResultType) => {
return {
aggGroupLabel: `${resultType}_results` as PreviewResultsKeys,
Expand Down Expand Up @@ -177,10 +183,14 @@ export function alertingServiceProvider(mlClient: MlClient) {
'is_interim',
'function',
'field_name',
'by_field_name',
'by_field_value',
'over_field_name',
'over_field_value',
'partition_field_name',
'partition_field_value',
'job_id',
'detector_index',
],
},
size: 3,
Expand Down Expand Up @@ -257,14 +267,31 @@ export function alertingServiceProvider(mlClient: MlClient) {
};
};

/**
* Provides unique key for the anomaly result.
*/
const getAlertInstanceKey = (source: any): string => {
let alertInstanceKey = `${source.job_id}_${source.timestamp}`;
if (source.result_type === ANOMALY_RESULT_TYPE.INFLUENCER) {
alertInstanceKey += `_${source.influencer_field_name}_${source.influencer_field_value}`;
} else if (source.result_type === ANOMALY_RESULT_TYPE.RECORD) {
const fieldName = getEntityFieldName(source);
const fieldValue = getEntityFieldValue(source);
alertInstanceKey += `_${source.detector_index}_${source.function}_${fieldName}_${fieldValue}`;
}
return alertInstanceKey;
};

/**
* Builds a request body
* @param params
* @param previewTimeInterval
* @param params - Alert params
* @param previewTimeInterval - Relative time interval to test the alert condition
* @param checkIntervalGap - Interval between alert executions
*/
const fetchAnomalies = async (
params: MlAnomalyDetectionAlertParams,
previewTimeInterval?: string
previewTimeInterval?: string,
checkIntervalGap?: Duration
): Promise<AlertExecutionResult[] | undefined> => {
const jobAndGroupIds = [
...(params.jobSelection.jobIds ?? []),
Expand All @@ -281,9 +308,14 @@ export function alertingServiceProvider(mlClient: MlClient) {
return;
}

const lookBackTimeInterval = resolveTimeInterval(
jobsResponse.map((v) => v.analysis_config.bucket_span)
);
/**
* The check interval might be bigger than the 2x bucket span.
* We need to check the biggest time range to make sure anomalies are not missed.
*/
const lookBackTimeInterval = `${Math.max(
resolveBucketSpanInSeconds(jobsResponse.map((v) => v.analysis_config.bucket_span)),
checkIntervalGap ? checkIntervalGap.asSeconds() : 0
)}s`;

const jobIds = jobsResponse.map((v) => v.job_id);

Expand Down Expand Up @@ -370,19 +402,22 @@ export function alertingServiceProvider(mlClient: MlClient) {
const aggTypeResults = v[resultsLabel.aggGroupLabel];
const requestedAnomalies = aggTypeResults[resultsLabel.topHitsLabel].hits.hits;

const topAnomaly = requestedAnomalies[0];
const alertInstanceKey = getAlertInstanceKey(topAnomaly._source);

return {
count: aggTypeResults.doc_count,
key: v.key,
key_as_string: v.key_as_string,
alertInstanceKey,
jobIds: [...new Set(requestedAnomalies.map((h) => h._source.job_id))],
isInterim: requestedAnomalies.some((h) => h._source.is_interim),
timestamp: requestedAnomalies[0]._source.timestamp,
timestampIso8601: requestedAnomalies[0].fields.timestamp_iso8601[0],
timestampEpoch: requestedAnomalies[0].fields.timestamp_epoch[0],
score: requestedAnomalies[0].fields.score[0],
timestamp: topAnomaly._source.timestamp,
timestampIso8601: topAnomaly.fields.timestamp_iso8601[0],
timestampEpoch: topAnomaly.fields.timestamp_epoch[0],
score: topAnomaly.fields.score[0],
bucketRange: {
start: requestedAnomalies[0].fields.start[0],
end: requestedAnomalies[0].fields.end[0],
start: topAnomaly.fields.start[0],
end: topAnomaly.fields.end[0],
},
topRecords: v.record_results.top_record_hits.hits.hits.map((h) => ({
...h._source,
Expand Down Expand Up @@ -479,13 +514,24 @@ export function alertingServiceProvider(mlClient: MlClient) {
/**
* Return the result of an alert condition execution.
*
* @param params
* @param params - Alert params
* @param publicBaseUrl
* @param alertId - Alert ID
* @param startedAt
* @param previousStartedAt
*/
execute: async (
params: MlAnomalyDetectionAlertParams,
publicBaseUrl: string | undefined
publicBaseUrl: string | undefined,
alertId: string,
startedAt: Date,
previousStartedAt: Date | null
): Promise<AnomalyDetectionAlertContext | undefined> => {
const res = await fetchAnomalies(params);
const checkIntervalGap = previousStartedAt
? moment.duration(moment(startedAt).diff(previousStartedAt))
: undefined;

const res = await fetchAnomalies(params, undefined, checkIntervalGap);

if (!res) {
throw new Error('No results found');
Expand All @@ -496,12 +542,65 @@ export function alertingServiceProvider(mlClient: MlClient) {

const anomalyExplorerUrl = buildExplorerUrl(result, params.resultType as AnomalyResultType);

return {
const executionResult = {
...result,
name: result.key_as_string,
name: result.alertInstanceKey,
anomalyExplorerUrl,
kibanaBaseUrl: publicBaseUrl!,
};

let kibanaEventLogCount = 0;
try {
// Check kibana-event-logs for presence of this alert instance
const kibanaLogResults = await esClient.count({
index: '.kibana-event-log-*',
body: {
query: {
bool: {
must: [
{
term: {
'kibana.alerting.action_group_id': {
value: ANOMALY_SCORE_MATCH_GROUP_ID,
},
},
},
{
term: {
'kibana.alerting.instance_id': {
value: executionResult.name,
},
},
},
{
nested: {
path: 'kibana.saved_objects',
query: {
term: {
'kibana.saved_objects.id': {
value: alertId,
},
},
},
},
},
],
},
},
},
});

kibanaEventLogCount = kibanaLogResults.body.count;
} catch (e) {
// eslint-disable-next-line no-console
console.log('Unable to check kibana event logs', e);
}

if (kibanaEventLogCount > 0) {
return;
}

return executionResult;
},
/**
* Checks how often the alert condition will fire an alert instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,19 @@ export function registerAnomalyDetectionAlertType({
},
producer: PLUGIN_ID,
minimumLicenseRequired: MINIMUM_FULL_LICENSE,
async executor({ services, params }) {
async executor({ services, params, alertId, state, previousStartedAt, startedAt }) {
const fakeRequest = {} as KibanaRequest;
const { execute } = mlSharedServices.alertingServiceProvider(
services.savedObjectsClient,
fakeRequest
);
const executionResult = await execute(params, publicBaseUrl);
const executionResult = await execute(
params,
publicBaseUrl,
alertId,
startedAt,
previousStartedAt
);

if (executionResult) {
const alertInstanceName = executionResult.name;
Expand Down
2 changes: 2 additions & 0 deletions x-pack/plugins/ml/server/lib/alerts/register_ml_alerts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
* 2.0.
*/

import { Logger } from 'kibana/server';
import { AlertingPlugin } from '../../../../alerts/server';
import { registerAnomalyDetectionAlertType } from './register_anomaly_detection_alert_type';
import { SharedServices } from '../../shared_services';

export interface RegisterAlertParams {
alerts: AlertingPlugin['setup'];
logger: Logger;
mlSharedServices: SharedServices;
publicBaseUrl: string | undefined;
}
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/ml/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ export class MlServerPlugin
if (plugins.alerts) {
registerMlAlerts({
alerts: plugins.alerts,
logger: this.log,
mlSharedServices: sharedServices,
publicBaseUrl: coreSetup.http.basePath.publicBaseUrl,
});
Expand Down
4 changes: 2 additions & 2 deletions x-pack/plugins/ml/server/routes/alerting.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ export function alertingRoutes({ router, routeGuard }: RouteInitialization) {
tags: ['access:ml:canGetJobs'],
},
},
routeGuard.fullLicenseAPIGuard(async ({ mlClient, request, response }) => {
routeGuard.fullLicenseAPIGuard(async ({ mlClient, request, response, client }) => {
try {
const alertingService = alertingServiceProvider(mlClient);
const alertingService = alertingServiceProvider(mlClient, client.asInternalUser);

const result = await alertingService.preview(request.body);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@ export function getAlertingServiceProvider(getGuards: GetGuards) {
return await getGuards(request, savedObjectsClient)
.isFullLicense()
.hasMlCapabilities(['canGetJobs'])
.ok(({ mlClient }) => alertingServiceProvider(mlClient).preview(...args));
.ok(({ mlClient, scopedClient }) =>
alertingServiceProvider(mlClient, scopedClient.asInternalUser).preview(...args)
);
},
execute: async (
...args: Parameters<MlAlertingService['execute']>
): ReturnType<MlAlertingService['execute']> => {
return await getGuards(request, savedObjectsClient)
.isFullLicense()
.hasMlCapabilities(['canGetJobs'])
.ok(({ mlClient }) => alertingServiceProvider(mlClient).execute(...args));
.ok(({ mlClient, scopedClient }) =>
alertingServiceProvider(mlClient, scopedClient.asInternalUser).execute(...args)
);
},
};
},
Expand Down

0 comments on commit c84047b

Please sign in to comment.