From 2fe9e2692a772123f6208505e3dfc1d28ee5be64 Mon Sep 17 00:00:00 2001 From: Dima Arnautov Date: Wed, 17 Feb 2021 19:26:23 +0100 Subject: [PATCH] [ML] Prevent duplicate notifications about the same anomaly result (#91485) * [ML] check kibana even logs for existing alert instance * [ML] create alert instance key, add check for alert id * [ML] use anomaly_utils, check interval gap * [ML] add detector index * [ML] fix unit test * [ML] include detector_index into source --- x-pack/plugins/ml/common/types/alerts.ts | 13 +- .../plugins/ml/common/util/anomaly_utils.ts | 4 - .../lib/alerts/alerting_service.test.ts | 4 +- .../ml/server/lib/alerts/alerting_service.ts | 143 +++++++++++++++--- .../register_anomaly_detection_alert_type.ts | 10 +- .../server/lib/alerts/register_ml_alerts.ts | 2 + x-pack/plugins/ml/server/plugin.ts | 1 + x-pack/plugins/ml/server/routes/alerting.ts | 4 +- .../providers/alerting_service.ts | 8 +- 9 files changed, 150 insertions(+), 39 deletions(-) diff --git a/x-pack/plugins/ml/common/types/alerts.ts b/x-pack/plugins/ml/common/types/alerts.ts index 51d06b990623..7e6e9d89c5a6 100644 --- a/x-pack/plugins/ml/common/types/alerts.ts +++ b/x-pack/plugins/ml/common/types/alerts.ts @@ -15,7 +15,7 @@ export type TopHitsResultsKeys = 'top_record_hits' | 'top_bucket_hits' | 'top_in export interface AlertExecutionResult { count: number; key: number; - key_as_string: string; + alertInstanceKey: string; isInterim: boolean; jobIds: string[]; timestamp: number; @@ -47,10 +47,13 @@ interface BaseAnomalyAlertDoc { export interface RecordAnomalyAlertDoc extends BaseAnomalyAlertDoc { result_type: typeof ANOMALY_RESULT_TYPE.RECORD; function: string; - field_name: string; - by_field_value: string | number; - over_field_value: string | number; - partition_field_value: string | number; + field_name?: string; + by_field_name?: string; + by_field_value?: string | number; + over_field_name?: string; + over_field_value?: string | number; + partition_field_name?: string; + partition_field_value?: string | number; } export interface BucketAnomalyAlertDoc extends BaseAnomalyAlertDoc { diff --git a/x-pack/plugins/ml/common/util/anomaly_utils.ts b/x-pack/plugins/ml/common/util/anomaly_utils.ts index 028afee2524c..68605f29c7be 100644 --- a/x-pack/plugins/ml/common/util/anomaly_utils.ts +++ b/x-pack/plugins/ml/common/util/anomaly_utils.ts @@ -230,8 +230,6 @@ export function getEntityFieldName(record: AnomalyRecordDoc): string | undefined if (record.partition_field_name !== undefined) { return record.partition_field_name; } - - return undefined; } // Returns the value of the field to use as the entity value from the source record @@ -249,8 +247,6 @@ export function getEntityFieldValue(record: AnomalyRecordDoc): string | number | if (record.partition_field_value !== undefined) { return record.partition_field_value; } - - return undefined; } // Returns the list of partitioning entity fields for the source record as a list diff --git a/x-pack/plugins/ml/server/lib/alerts/alerting_service.test.ts b/x-pack/plugins/ml/server/lib/alerts/alerting_service.test.ts index 261fac7b620b..f029fa24f960 100644 --- a/x-pack/plugins/ml/server/lib/alerts/alerting_service.test.ts +++ b/x-pack/plugins/ml/server/lib/alerts/alerting_service.test.ts @@ -5,10 +5,10 @@ * 2.0. */ -import { resolveTimeInterval } from './alerting_service'; +import { resolveBucketSpanInSeconds } from './alerting_service'; describe('Alerting Service', () => { test('should resolve maximum bucket interval', () => { - expect(resolveTimeInterval(['15m', '1h', '6h', '90s'])).toBe('43200s'); + expect(resolveBucketSpanInSeconds(['15m', '1h', '6h', '90s'])).toBe(43200); }); }); diff --git a/x-pack/plugins/ml/server/lib/alerts/alerting_service.ts b/x-pack/plugins/ml/server/lib/alerts/alerting_service.ts index 5ef883cc50fb..6e7cd77e450b 100644 --- a/x-pack/plugins/ml/server/lib/alerts/alerting_service.ts +++ b/x-pack/plugins/ml/server/lib/alerts/alerting_service.ts @@ -7,6 +7,9 @@ import Boom from '@hapi/boom'; import rison from 'rison-node'; +import { ElasticsearchClient } from 'kibana/server'; +import moment from 'moment'; +import { Duration } from 'moment/moment'; import { MlClient } from '../ml_client'; import { MlAnomalyDetectionAlertParams, @@ -25,6 +28,8 @@ import { import { parseInterval } from '../../../common/util/parse_interval'; import { AnomalyDetectionAlertContext } from './register_anomaly_detection_alert_type'; import { MlJobsResponse } from '../../../common/types/job_service'; +import { ANOMALY_SCORE_MATCH_GROUP_ID } from '../../../common/constants/alerts'; +import { getEntityFieldName, getEntityFieldValue } from '../../../common/util/anomaly_utils'; function isDefined(argument: T | undefined | null): argument is T { return argument !== undefined && argument !== null; @@ -34,22 +39,23 @@ function isDefined(argument: T | undefined | null): argument is T { * Resolves the longest bucket span from the list and multiply it by 2. * @param bucketSpans Collection of bucket spans */ -export function resolveTimeInterval(bucketSpans: string[]): string { - return `${ +export function resolveBucketSpanInSeconds(bucketSpans: string[]): number { + return ( Math.max( ...bucketSpans .map((b) => parseInterval(b)) .filter(isDefined) .map((v) => v.asSeconds()) ) * 2 - }s`; + ); } /** * Alerting related server-side methods * @param mlClient + * @param esClient */ -export function alertingServiceProvider(mlClient: MlClient) { +export function alertingServiceProvider(mlClient: MlClient, esClient: ElasticsearchClient) { const getAggResultsLabel = (resultType: AnomalyResultType) => { return { aggGroupLabel: `${resultType}_results` as PreviewResultsKeys, @@ -177,10 +183,14 @@ export function alertingServiceProvider(mlClient: MlClient) { 'is_interim', 'function', 'field_name', + 'by_field_name', 'by_field_value', + 'over_field_name', 'over_field_value', + 'partition_field_name', 'partition_field_value', 'job_id', + 'detector_index', ], }, size: 3, @@ -257,14 +267,31 @@ export function alertingServiceProvider(mlClient: MlClient) { }; }; + /** + * Provides unique key for the anomaly result. + */ + const getAlertInstanceKey = (source: any): string => { + let alertInstanceKey = `${source.job_id}_${source.timestamp}`; + if (source.result_type === ANOMALY_RESULT_TYPE.INFLUENCER) { + alertInstanceKey += `_${source.influencer_field_name}_${source.influencer_field_value}`; + } else if (source.result_type === ANOMALY_RESULT_TYPE.RECORD) { + const fieldName = getEntityFieldName(source); + const fieldValue = getEntityFieldValue(source); + alertInstanceKey += `_${source.detector_index}_${source.function}_${fieldName}_${fieldValue}`; + } + return alertInstanceKey; + }; + /** * Builds a request body - * @param params - * @param previewTimeInterval + * @param params - Alert params + * @param previewTimeInterval - Relative time interval to test the alert condition + * @param checkIntervalGap - Interval between alert executions */ const fetchAnomalies = async ( params: MlAnomalyDetectionAlertParams, - previewTimeInterval?: string + previewTimeInterval?: string, + checkIntervalGap?: Duration ): Promise => { const jobAndGroupIds = [ ...(params.jobSelection.jobIds ?? []), @@ -281,9 +308,14 @@ export function alertingServiceProvider(mlClient: MlClient) { return; } - const lookBackTimeInterval = resolveTimeInterval( - jobsResponse.map((v) => v.analysis_config.bucket_span) - ); + /** + * The check interval might be bigger than the 2x bucket span. + * We need to check the biggest time range to make sure anomalies are not missed. + */ + const lookBackTimeInterval = `${Math.max( + resolveBucketSpanInSeconds(jobsResponse.map((v) => v.analysis_config.bucket_span)), + checkIntervalGap ? checkIntervalGap.asSeconds() : 0 + )}s`; const jobIds = jobsResponse.map((v) => v.job_id); @@ -370,19 +402,22 @@ export function alertingServiceProvider(mlClient: MlClient) { const aggTypeResults = v[resultsLabel.aggGroupLabel]; const requestedAnomalies = aggTypeResults[resultsLabel.topHitsLabel].hits.hits; + const topAnomaly = requestedAnomalies[0]; + const alertInstanceKey = getAlertInstanceKey(topAnomaly._source); + return { count: aggTypeResults.doc_count, key: v.key, - key_as_string: v.key_as_string, + alertInstanceKey, jobIds: [...new Set(requestedAnomalies.map((h) => h._source.job_id))], isInterim: requestedAnomalies.some((h) => h._source.is_interim), - timestamp: requestedAnomalies[0]._source.timestamp, - timestampIso8601: requestedAnomalies[0].fields.timestamp_iso8601[0], - timestampEpoch: requestedAnomalies[0].fields.timestamp_epoch[0], - score: requestedAnomalies[0].fields.score[0], + timestamp: topAnomaly._source.timestamp, + timestampIso8601: topAnomaly.fields.timestamp_iso8601[0], + timestampEpoch: topAnomaly.fields.timestamp_epoch[0], + score: topAnomaly.fields.score[0], bucketRange: { - start: requestedAnomalies[0].fields.start[0], - end: requestedAnomalies[0].fields.end[0], + start: topAnomaly.fields.start[0], + end: topAnomaly.fields.end[0], }, topRecords: v.record_results.top_record_hits.hits.hits.map((h) => ({ ...h._source, @@ -479,13 +514,24 @@ export function alertingServiceProvider(mlClient: MlClient) { /** * Return the result of an alert condition execution. * - * @param params + * @param params - Alert params + * @param publicBaseUrl + * @param alertId - Alert ID + * @param startedAt + * @param previousStartedAt */ execute: async ( params: MlAnomalyDetectionAlertParams, - publicBaseUrl: string | undefined + publicBaseUrl: string | undefined, + alertId: string, + startedAt: Date, + previousStartedAt: Date | null ): Promise => { - const res = await fetchAnomalies(params); + const checkIntervalGap = previousStartedAt + ? moment.duration(moment(startedAt).diff(previousStartedAt)) + : undefined; + + const res = await fetchAnomalies(params, undefined, checkIntervalGap); if (!res) { throw new Error('No results found'); @@ -496,12 +542,65 @@ export function alertingServiceProvider(mlClient: MlClient) { const anomalyExplorerUrl = buildExplorerUrl(result, params.resultType as AnomalyResultType); - return { + const executionResult = { ...result, - name: result.key_as_string, + name: result.alertInstanceKey, anomalyExplorerUrl, kibanaBaseUrl: publicBaseUrl!, }; + + let kibanaEventLogCount = 0; + try { + // Check kibana-event-logs for presence of this alert instance + const kibanaLogResults = await esClient.count({ + index: '.kibana-event-log-*', + body: { + query: { + bool: { + must: [ + { + term: { + 'kibana.alerting.action_group_id': { + value: ANOMALY_SCORE_MATCH_GROUP_ID, + }, + }, + }, + { + term: { + 'kibana.alerting.instance_id': { + value: executionResult.name, + }, + }, + }, + { + nested: { + path: 'kibana.saved_objects', + query: { + term: { + 'kibana.saved_objects.id': { + value: alertId, + }, + }, + }, + }, + }, + ], + }, + }, + }, + }); + + kibanaEventLogCount = kibanaLogResults.body.count; + } catch (e) { + // eslint-disable-next-line no-console + console.log('Unable to check kibana event logs', e); + } + + if (kibanaEventLogCount > 0) { + return; + } + + return executionResult; }, /** * Checks how often the alert condition will fire an alert instance diff --git a/x-pack/plugins/ml/server/lib/alerts/register_anomaly_detection_alert_type.ts b/x-pack/plugins/ml/server/lib/alerts/register_anomaly_detection_alert_type.ts index 6f8fa59aa231..30a92c02cefc 100644 --- a/x-pack/plugins/ml/server/lib/alerts/register_anomaly_detection_alert_type.ts +++ b/x-pack/plugins/ml/server/lib/alerts/register_anomaly_detection_alert_type.ts @@ -123,13 +123,19 @@ export function registerAnomalyDetectionAlertType({ }, producer: PLUGIN_ID, minimumLicenseRequired: MINIMUM_FULL_LICENSE, - async executor({ services, params }) { + async executor({ services, params, alertId, state, previousStartedAt, startedAt }) { const fakeRequest = {} as KibanaRequest; const { execute } = mlSharedServices.alertingServiceProvider( services.savedObjectsClient, fakeRequest ); - const executionResult = await execute(params, publicBaseUrl); + const executionResult = await execute( + params, + publicBaseUrl, + alertId, + startedAt, + previousStartedAt + ); if (executionResult) { const alertInstanceName = executionResult.name; diff --git a/x-pack/plugins/ml/server/lib/alerts/register_ml_alerts.ts b/x-pack/plugins/ml/server/lib/alerts/register_ml_alerts.ts index 5c9106d78595..371c5435f91d 100644 --- a/x-pack/plugins/ml/server/lib/alerts/register_ml_alerts.ts +++ b/x-pack/plugins/ml/server/lib/alerts/register_ml_alerts.ts @@ -5,12 +5,14 @@ * 2.0. */ +import { Logger } from 'kibana/server'; import { AlertingPlugin } from '../../../../alerts/server'; import { registerAnomalyDetectionAlertType } from './register_anomaly_detection_alert_type'; import { SharedServices } from '../../shared_services'; export interface RegisterAlertParams { alerts: AlertingPlugin['setup']; + logger: Logger; mlSharedServices: SharedServices; publicBaseUrl: string | undefined; } diff --git a/x-pack/plugins/ml/server/plugin.ts b/x-pack/plugins/ml/server/plugin.ts index 10ed70d7f739..24fac9184cc2 100644 --- a/x-pack/plugins/ml/server/plugin.ts +++ b/x-pack/plugins/ml/server/plugin.ts @@ -211,6 +211,7 @@ export class MlServerPlugin if (plugins.alerts) { registerMlAlerts({ alerts: plugins.alerts, + logger: this.log, mlSharedServices: sharedServices, publicBaseUrl: coreSetup.http.basePath.publicBaseUrl, }); diff --git a/x-pack/plugins/ml/server/routes/alerting.ts b/x-pack/plugins/ml/server/routes/alerting.ts index 7b7f3a7db972..a268a5200b35 100644 --- a/x-pack/plugins/ml/server/routes/alerting.ts +++ b/x-pack/plugins/ml/server/routes/alerting.ts @@ -30,9 +30,9 @@ export function alertingRoutes({ router, routeGuard }: RouteInitialization) { tags: ['access:ml:canGetJobs'], }, }, - routeGuard.fullLicenseAPIGuard(async ({ mlClient, request, response }) => { + routeGuard.fullLicenseAPIGuard(async ({ mlClient, request, response, client }) => { try { - const alertingService = alertingServiceProvider(mlClient); + const alertingService = alertingServiceProvider(mlClient, client.asInternalUser); const result = await alertingService.preview(request.body); diff --git a/x-pack/plugins/ml/server/shared_services/providers/alerting_service.ts b/x-pack/plugins/ml/server/shared_services/providers/alerting_service.ts index 318dac200a87..cbe22478e12d 100644 --- a/x-pack/plugins/ml/server/shared_services/providers/alerting_service.ts +++ b/x-pack/plugins/ml/server/shared_services/providers/alerting_service.ts @@ -20,7 +20,9 @@ export function getAlertingServiceProvider(getGuards: GetGuards) { return await getGuards(request, savedObjectsClient) .isFullLicense() .hasMlCapabilities(['canGetJobs']) - .ok(({ mlClient }) => alertingServiceProvider(mlClient).preview(...args)); + .ok(({ mlClient, scopedClient }) => + alertingServiceProvider(mlClient, scopedClient.asInternalUser).preview(...args) + ); }, execute: async ( ...args: Parameters @@ -28,7 +30,9 @@ export function getAlertingServiceProvider(getGuards: GetGuards) { return await getGuards(request, savedObjectsClient) .isFullLicense() .hasMlCapabilities(['canGetJobs']) - .ok(({ mlClient }) => alertingServiceProvider(mlClient).execute(...args)); + .ok(({ mlClient, scopedClient }) => + alertingServiceProvider(mlClient, scopedClient.asInternalUser).execute(...args) + ); }, }; },