From 897415dbfa36201cd3a23a04232245930a9fcfaa Mon Sep 17 00:00:00 2001 From: Dima Arnautov Date: Tue, 16 Feb 2021 12:05:45 +0100 Subject: [PATCH 1/6] [ML] check kibana even logs for existing alert instance --- .../ml/server/lib/alerts/alerting_service.ts | 48 ++++++++++++++++++- x-pack/plugins/ml/server/routes/alerting.ts | 4 +- .../providers/alerting_service.ts | 8 +++- 3 files changed, 54 insertions(+), 6 deletions(-) diff --git a/x-pack/plugins/ml/server/lib/alerts/alerting_service.ts b/x-pack/plugins/ml/server/lib/alerts/alerting_service.ts index 5ef883cc50fb..e30cc997343b 100644 --- a/x-pack/plugins/ml/server/lib/alerts/alerting_service.ts +++ b/x-pack/plugins/ml/server/lib/alerts/alerting_service.ts @@ -7,6 +7,7 @@ import Boom from '@hapi/boom'; import rison from 'rison-node'; +import { ElasticsearchClient } from 'kibana/server'; import { MlClient } from '../ml_client'; import { MlAnomalyDetectionAlertParams, @@ -25,6 +26,7 @@ import { import { parseInterval } from '../../../common/util/parse_interval'; import { AnomalyDetectionAlertContext } from './register_anomaly_detection_alert_type'; import { MlJobsResponse } from '../../../common/types/job_service'; +import { ANOMALY_SCORE_MATCH_GROUP_ID } from '../../../common/constants/alerts'; function isDefined(argument: T | undefined | null): argument is T { return argument !== undefined && argument !== null; @@ -48,8 +50,9 @@ export function resolveTimeInterval(bucketSpans: string[]): string { /** * Alerting related server-side methods * @param mlClient + * @param esClient */ -export function alertingServiceProvider(mlClient: MlClient) { +export function alertingServiceProvider(mlClient: MlClient, esClient: ElasticsearchClient) { const getAggResultsLabel = (resultType: AnomalyResultType) => { return { aggGroupLabel: `${resultType}_results` as PreviewResultsKeys, @@ -496,12 +499,53 @@ export function alertingServiceProvider(mlClient: MlClient) { const anomalyExplorerUrl = buildExplorerUrl(result, params.resultType as AnomalyResultType); - return { + const executionResult = { ...result, name: result.key_as_string, anomalyExplorerUrl, kibanaBaseUrl: publicBaseUrl!, }; + + let kibanaEventLogCount = 0; + try { + // Check kibana-event-logs for presence of this alert instance + const kibanaLogResults = await esClient.count({ + index: '.kibana-event-log-*', + body: { + query: { + bool: { + must: [ + { + term: { + 'kibana.alerting.action_group_id': { + value: ANOMALY_SCORE_MATCH_GROUP_ID, + }, + }, + }, + { + term: { + 'kibana.alerting.instance_id': { + value: executionResult.name, + }, + }, + }, + ], + }, + }, + }, + }); + + kibanaEventLogCount = kibanaLogResults.body.count; + } catch (e) { + // eslint-disable-next-line no-console + console.log('Unable to check kibana event logs', e); + } + + if (kibanaEventLogCount > 0) { + return; + } + + return executionResult; }, /** * Checks how often the alert condition will fire an alert instance diff --git a/x-pack/plugins/ml/server/routes/alerting.ts b/x-pack/plugins/ml/server/routes/alerting.ts index 7b7f3a7db972..a268a5200b35 100644 --- a/x-pack/plugins/ml/server/routes/alerting.ts +++ b/x-pack/plugins/ml/server/routes/alerting.ts @@ -30,9 +30,9 @@ export function alertingRoutes({ router, routeGuard }: RouteInitialization) { tags: ['access:ml:canGetJobs'], }, }, - routeGuard.fullLicenseAPIGuard(async ({ mlClient, request, response }) => { + routeGuard.fullLicenseAPIGuard(async ({ mlClient, request, response, client }) => { try { - const alertingService = alertingServiceProvider(mlClient); + const alertingService = alertingServiceProvider(mlClient, client.asInternalUser); const result = await alertingService.preview(request.body); diff --git a/x-pack/plugins/ml/server/shared_services/providers/alerting_service.ts b/x-pack/plugins/ml/server/shared_services/providers/alerting_service.ts index 318dac200a87..cbe22478e12d 100644 --- a/x-pack/plugins/ml/server/shared_services/providers/alerting_service.ts +++ b/x-pack/plugins/ml/server/shared_services/providers/alerting_service.ts @@ -20,7 +20,9 @@ export function getAlertingServiceProvider(getGuards: GetGuards) { return await getGuards(request, savedObjectsClient) .isFullLicense() .hasMlCapabilities(['canGetJobs']) - .ok(({ mlClient }) => alertingServiceProvider(mlClient).preview(...args)); + .ok(({ mlClient, scopedClient }) => + alertingServiceProvider(mlClient, scopedClient.asInternalUser).preview(...args) + ); }, execute: async ( ...args: Parameters @@ -28,7 +30,9 @@ export function getAlertingServiceProvider(getGuards: GetGuards) { return await getGuards(request, savedObjectsClient) .isFullLicense() .hasMlCapabilities(['canGetJobs']) - .ok(({ mlClient }) => alertingServiceProvider(mlClient).execute(...args)); + .ok(({ mlClient, scopedClient }) => + alertingServiceProvider(mlClient, scopedClient.asInternalUser).execute(...args) + ); }, }; }, From f393a0f5790cc72e168950eecfb27edd840b8589 Mon Sep 17 00:00:00 2001 From: Dima Arnautov Date: Tue, 16 Feb 2021 14:44:52 +0100 Subject: [PATCH 2/6] [ML] create alert instance key, add check for alert id --- x-pack/plugins/ml/common/types/alerts.ts | 13 ++-- .../ml/server/lib/alerts/alerting_service.ts | 61 ++++++++++++++++--- .../register_anomaly_detection_alert_type.ts | 4 +- 3 files changed, 61 insertions(+), 17 deletions(-) diff --git a/x-pack/plugins/ml/common/types/alerts.ts b/x-pack/plugins/ml/common/types/alerts.ts index 51d06b990623..7e6e9d89c5a6 100644 --- a/x-pack/plugins/ml/common/types/alerts.ts +++ b/x-pack/plugins/ml/common/types/alerts.ts @@ -15,7 +15,7 @@ export type TopHitsResultsKeys = 'top_record_hits' | 'top_bucket_hits' | 'top_in export interface AlertExecutionResult { count: number; key: number; - key_as_string: string; + alertInstanceKey: string; isInterim: boolean; jobIds: string[]; timestamp: number; @@ -47,10 +47,13 @@ interface BaseAnomalyAlertDoc { export interface RecordAnomalyAlertDoc extends BaseAnomalyAlertDoc { result_type: typeof ANOMALY_RESULT_TYPE.RECORD; function: string; - field_name: string; - by_field_value: string | number; - over_field_value: string | number; - partition_field_value: string | number; + field_name?: string; + by_field_name?: string; + by_field_value?: string | number; + over_field_name?: string; + over_field_value?: string | number; + partition_field_name?: string; + partition_field_value?: string | number; } export interface BucketAnomalyAlertDoc extends BaseAnomalyAlertDoc { diff --git a/x-pack/plugins/ml/server/lib/alerts/alerting_service.ts b/x-pack/plugins/ml/server/lib/alerts/alerting_service.ts index e30cc997343b..cc28c0085c5f 100644 --- a/x-pack/plugins/ml/server/lib/alerts/alerting_service.ts +++ b/x-pack/plugins/ml/server/lib/alerts/alerting_service.ts @@ -180,8 +180,11 @@ export function alertingServiceProvider(mlClient: MlClient, esClient: Elasticsea 'is_interim', 'function', 'field_name', + 'by_field_name', 'by_field_value', + 'over_field_name', 'over_field_value', + 'partition_field_name', 'partition_field_value', 'job_id', ], @@ -260,6 +263,26 @@ export function alertingServiceProvider(mlClient: MlClient, esClient: Elasticsea }; }; + /** + * Provides unique key for the anomaly result. + */ + const getAlertInstanceKey = (source: any): string => { + let alertInstanceKey = `${source.job_id}_${source.timestamp}`; + if (source.result_type === ANOMALY_RESULT_TYPE.INFLUENCER) { + alertInstanceKey += `_${source.influencer_field_name}_${source.influencer_field_value}`; + } else if (source.result_type === ANOMALY_RESULT_TYPE.RECORD) { + const fieldName = + source.field_name ?? + source.by_field_name ?? + source.over_field_name ?? + source.partition_field_name; + const fieldValue = + source.by_field_value ?? source.over_field_value ?? source.partition_field_value; + alertInstanceKey += `_${source.function}_${fieldName}_${fieldValue}`; + } + return alertInstanceKey; + }; + /** * Builds a request body * @param params @@ -373,19 +396,22 @@ export function alertingServiceProvider(mlClient: MlClient, esClient: Elasticsea const aggTypeResults = v[resultsLabel.aggGroupLabel]; const requestedAnomalies = aggTypeResults[resultsLabel.topHitsLabel].hits.hits; + const topAnomaly = requestedAnomalies[0]; + const alertInstanceKey = getAlertInstanceKey(topAnomaly._source); + return { count: aggTypeResults.doc_count, key: v.key, - key_as_string: v.key_as_string, + alertInstanceKey, jobIds: [...new Set(requestedAnomalies.map((h) => h._source.job_id))], isInterim: requestedAnomalies.some((h) => h._source.is_interim), - timestamp: requestedAnomalies[0]._source.timestamp, - timestampIso8601: requestedAnomalies[0].fields.timestamp_iso8601[0], - timestampEpoch: requestedAnomalies[0].fields.timestamp_epoch[0], - score: requestedAnomalies[0].fields.score[0], + timestamp: topAnomaly._source.timestamp, + timestampIso8601: topAnomaly.fields.timestamp_iso8601[0], + timestampEpoch: topAnomaly.fields.timestamp_epoch[0], + score: topAnomaly.fields.score[0], bucketRange: { - start: requestedAnomalies[0].fields.start[0], - end: requestedAnomalies[0].fields.end[0], + start: topAnomaly.fields.start[0], + end: topAnomaly.fields.end[0], }, topRecords: v.record_results.top_record_hits.hits.hits.map((h) => ({ ...h._source, @@ -482,11 +508,14 @@ export function alertingServiceProvider(mlClient: MlClient, esClient: Elasticsea /** * Return the result of an alert condition execution. * - * @param params + * @param params - Alert params + * @param publicBaseUrl + * @param alertId - Alert ID */ execute: async ( params: MlAnomalyDetectionAlertParams, - publicBaseUrl: string | undefined + publicBaseUrl: string | undefined, + alertId: string ): Promise => { const res = await fetchAnomalies(params); @@ -501,7 +530,7 @@ export function alertingServiceProvider(mlClient: MlClient, esClient: Elasticsea const executionResult = { ...result, - name: result.key_as_string, + name: result.alertInstanceKey, anomalyExplorerUrl, kibanaBaseUrl: publicBaseUrl!, }; @@ -529,6 +558,18 @@ export function alertingServiceProvider(mlClient: MlClient, esClient: Elasticsea }, }, }, + { + nested: { + path: 'kibana.saved_objects', + query: { + term: { + 'kibana.saved_objects.id': { + value: alertId, + }, + }, + }, + }, + }, ], }, }, diff --git a/x-pack/plugins/ml/server/lib/alerts/register_anomaly_detection_alert_type.ts b/x-pack/plugins/ml/server/lib/alerts/register_anomaly_detection_alert_type.ts index 6f8fa59aa231..9a9d68966747 100644 --- a/x-pack/plugins/ml/server/lib/alerts/register_anomaly_detection_alert_type.ts +++ b/x-pack/plugins/ml/server/lib/alerts/register_anomaly_detection_alert_type.ts @@ -123,13 +123,13 @@ export function registerAnomalyDetectionAlertType({ }, producer: PLUGIN_ID, minimumLicenseRequired: MINIMUM_FULL_LICENSE, - async executor({ services, params }) { + async executor({ services, params, alertId }) { const fakeRequest = {} as KibanaRequest; const { execute } = mlSharedServices.alertingServiceProvider( services.savedObjectsClient, fakeRequest ); - const executionResult = await execute(params, publicBaseUrl); + const executionResult = await execute(params, publicBaseUrl, alertId); if (executionResult) { const alertInstanceName = executionResult.name; From a3d3d13b225dfc80c611d28d16b9054d07a7f4e1 Mon Sep 17 00:00:00 2001 From: Dima Arnautov Date: Wed, 17 Feb 2021 10:01:49 +0100 Subject: [PATCH 3/6] [ML] use anomaly_utils, check interval gap --- .../plugins/ml/common/util/anomaly_utils.ts | 4 -- .../ml/server/lib/alerts/alerting_service.ts | 49 ++++++++++++------- .../register_anomaly_detection_alert_type.ts | 10 +++- .../server/lib/alerts/register_ml_alerts.ts | 2 + x-pack/plugins/ml/server/plugin.ts | 1 + 5 files changed, 42 insertions(+), 24 deletions(-) diff --git a/x-pack/plugins/ml/common/util/anomaly_utils.ts b/x-pack/plugins/ml/common/util/anomaly_utils.ts index 028afee2524c..68605f29c7be 100644 --- a/x-pack/plugins/ml/common/util/anomaly_utils.ts +++ b/x-pack/plugins/ml/common/util/anomaly_utils.ts @@ -230,8 +230,6 @@ export function getEntityFieldName(record: AnomalyRecordDoc): string | undefined if (record.partition_field_name !== undefined) { return record.partition_field_name; } - - return undefined; } // Returns the value of the field to use as the entity value from the source record @@ -249,8 +247,6 @@ export function getEntityFieldValue(record: AnomalyRecordDoc): string | number | if (record.partition_field_value !== undefined) { return record.partition_field_value; } - - return undefined; } // Returns the list of partitioning entity fields for the source record as a list diff --git a/x-pack/plugins/ml/server/lib/alerts/alerting_service.ts b/x-pack/plugins/ml/server/lib/alerts/alerting_service.ts index cc28c0085c5f..b565a02fe603 100644 --- a/x-pack/plugins/ml/server/lib/alerts/alerting_service.ts +++ b/x-pack/plugins/ml/server/lib/alerts/alerting_service.ts @@ -8,6 +8,8 @@ import Boom from '@hapi/boom'; import rison from 'rison-node'; import { ElasticsearchClient } from 'kibana/server'; +import moment from 'moment'; +import { Duration } from 'moment/moment'; import { MlClient } from '../ml_client'; import { MlAnomalyDetectionAlertParams, @@ -27,6 +29,7 @@ import { parseInterval } from '../../../common/util/parse_interval'; import { AnomalyDetectionAlertContext } from './register_anomaly_detection_alert_type'; import { MlJobsResponse } from '../../../common/types/job_service'; import { ANOMALY_SCORE_MATCH_GROUP_ID } from '../../../common/constants/alerts'; +import { getEntityFieldName, getEntityFieldValue } from '../../../common/util/anomaly_utils'; function isDefined(argument: T | undefined | null): argument is T { return argument !== undefined && argument !== null; @@ -36,15 +39,15 @@ function isDefined(argument: T | undefined | null): argument is T { * Resolves the longest bucket span from the list and multiply it by 2. * @param bucketSpans Collection of bucket spans */ -export function resolveTimeInterval(bucketSpans: string[]): string { - return `${ +export function resolveBucketSpanInSeconds(bucketSpans: string[]): number { + return ( Math.max( ...bucketSpans .map((b) => parseInterval(b)) .filter(isDefined) .map((v) => v.asSeconds()) ) * 2 - }s`; + ); } /** @@ -271,13 +274,8 @@ export function alertingServiceProvider(mlClient: MlClient, esClient: Elasticsea if (source.result_type === ANOMALY_RESULT_TYPE.INFLUENCER) { alertInstanceKey += `_${source.influencer_field_name}_${source.influencer_field_value}`; } else if (source.result_type === ANOMALY_RESULT_TYPE.RECORD) { - const fieldName = - source.field_name ?? - source.by_field_name ?? - source.over_field_name ?? - source.partition_field_name; - const fieldValue = - source.by_field_value ?? source.over_field_value ?? source.partition_field_value; + const fieldName = getEntityFieldName(source); + const fieldValue = getEntityFieldValue(source); alertInstanceKey += `_${source.function}_${fieldName}_${fieldValue}`; } return alertInstanceKey; @@ -285,12 +283,14 @@ export function alertingServiceProvider(mlClient: MlClient, esClient: Elasticsea /** * Builds a request body - * @param params - * @param previewTimeInterval + * @param params - Alert params + * @param previewTimeInterval - Relative time interval to test the alert condition + * @param checkIntervalGap - Interval between alert executions */ const fetchAnomalies = async ( params: MlAnomalyDetectionAlertParams, - previewTimeInterval?: string + previewTimeInterval?: string, + checkIntervalGap?: Duration ): Promise => { const jobAndGroupIds = [ ...(params.jobSelection.jobIds ?? []), @@ -307,9 +307,14 @@ export function alertingServiceProvider(mlClient: MlClient, esClient: Elasticsea return; } - const lookBackTimeInterval = resolveTimeInterval( - jobsResponse.map((v) => v.analysis_config.bucket_span) - ); + /** + * The check interval might be bigger than the 2x bucket span. + * We need to check the biggest time range to make sure anomalies are not missed. + */ + const lookBackTimeInterval = `${Math.max( + resolveBucketSpanInSeconds(jobsResponse.map((v) => v.analysis_config.bucket_span)), + checkIntervalGap ? checkIntervalGap.asSeconds() : 0 + )}s`; const jobIds = jobsResponse.map((v) => v.job_id); @@ -511,13 +516,21 @@ export function alertingServiceProvider(mlClient: MlClient, esClient: Elasticsea * @param params - Alert params * @param publicBaseUrl * @param alertId - Alert ID + * @param startedAt + * @param previousStartedAt */ execute: async ( params: MlAnomalyDetectionAlertParams, publicBaseUrl: string | undefined, - alertId: string + alertId: string, + startedAt: Date, + previousStartedAt: Date | null ): Promise => { - const res = await fetchAnomalies(params); + const checkIntervalGap = previousStartedAt + ? moment.duration(moment(startedAt).diff(previousStartedAt)) + : undefined; + + const res = await fetchAnomalies(params, undefined, checkIntervalGap); if (!res) { throw new Error('No results found'); diff --git a/x-pack/plugins/ml/server/lib/alerts/register_anomaly_detection_alert_type.ts b/x-pack/plugins/ml/server/lib/alerts/register_anomaly_detection_alert_type.ts index 9a9d68966747..30a92c02cefc 100644 --- a/x-pack/plugins/ml/server/lib/alerts/register_anomaly_detection_alert_type.ts +++ b/x-pack/plugins/ml/server/lib/alerts/register_anomaly_detection_alert_type.ts @@ -123,13 +123,19 @@ export function registerAnomalyDetectionAlertType({ }, producer: PLUGIN_ID, minimumLicenseRequired: MINIMUM_FULL_LICENSE, - async executor({ services, params, alertId }) { + async executor({ services, params, alertId, state, previousStartedAt, startedAt }) { const fakeRequest = {} as KibanaRequest; const { execute } = mlSharedServices.alertingServiceProvider( services.savedObjectsClient, fakeRequest ); - const executionResult = await execute(params, publicBaseUrl, alertId); + const executionResult = await execute( + params, + publicBaseUrl, + alertId, + startedAt, + previousStartedAt + ); if (executionResult) { const alertInstanceName = executionResult.name; diff --git a/x-pack/plugins/ml/server/lib/alerts/register_ml_alerts.ts b/x-pack/plugins/ml/server/lib/alerts/register_ml_alerts.ts index 5c9106d78595..371c5435f91d 100644 --- a/x-pack/plugins/ml/server/lib/alerts/register_ml_alerts.ts +++ b/x-pack/plugins/ml/server/lib/alerts/register_ml_alerts.ts @@ -5,12 +5,14 @@ * 2.0. */ +import { Logger } from 'kibana/server'; import { AlertingPlugin } from '../../../../alerts/server'; import { registerAnomalyDetectionAlertType } from './register_anomaly_detection_alert_type'; import { SharedServices } from '../../shared_services'; export interface RegisterAlertParams { alerts: AlertingPlugin['setup']; + logger: Logger; mlSharedServices: SharedServices; publicBaseUrl: string | undefined; } diff --git a/x-pack/plugins/ml/server/plugin.ts b/x-pack/plugins/ml/server/plugin.ts index 10ed70d7f739..24fac9184cc2 100644 --- a/x-pack/plugins/ml/server/plugin.ts +++ b/x-pack/plugins/ml/server/plugin.ts @@ -211,6 +211,7 @@ export class MlServerPlugin if (plugins.alerts) { registerMlAlerts({ alerts: plugins.alerts, + logger: this.log, mlSharedServices: sharedServices, publicBaseUrl: coreSetup.http.basePath.publicBaseUrl, }); From 3e910c238ae187850f81b2be1b7cdff3cbdb9a3a Mon Sep 17 00:00:00 2001 From: Dima Arnautov Date: Wed, 17 Feb 2021 16:06:54 +0100 Subject: [PATCH 4/6] [ML] add detector index --- x-pack/plugins/ml/server/lib/alerts/alerting_service.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugins/ml/server/lib/alerts/alerting_service.ts b/x-pack/plugins/ml/server/lib/alerts/alerting_service.ts index b565a02fe603..45785c9b8f19 100644 --- a/x-pack/plugins/ml/server/lib/alerts/alerting_service.ts +++ b/x-pack/plugins/ml/server/lib/alerts/alerting_service.ts @@ -276,7 +276,7 @@ export function alertingServiceProvider(mlClient: MlClient, esClient: Elasticsea } else if (source.result_type === ANOMALY_RESULT_TYPE.RECORD) { const fieldName = getEntityFieldName(source); const fieldValue = getEntityFieldValue(source); - alertInstanceKey += `_${source.function}_${fieldName}_${fieldValue}`; + alertInstanceKey += `_${source.detector_index}_${source.function}_${fieldName}_${fieldValue}`; } return alertInstanceKey; }; From 224b3b366cc8aeaca577d6512b92bed4e230262d Mon Sep 17 00:00:00 2001 From: Dima Arnautov Date: Wed, 17 Feb 2021 16:52:26 +0100 Subject: [PATCH 5/6] [ML] fix unit test --- x-pack/plugins/ml/server/lib/alerts/alerting_service.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugins/ml/server/lib/alerts/alerting_service.test.ts b/x-pack/plugins/ml/server/lib/alerts/alerting_service.test.ts index 261fac7b620b..f029fa24f960 100644 --- a/x-pack/plugins/ml/server/lib/alerts/alerting_service.test.ts +++ b/x-pack/plugins/ml/server/lib/alerts/alerting_service.test.ts @@ -5,10 +5,10 @@ * 2.0. */ -import { resolveTimeInterval } from './alerting_service'; +import { resolveBucketSpanInSeconds } from './alerting_service'; describe('Alerting Service', () => { test('should resolve maximum bucket interval', () => { - expect(resolveTimeInterval(['15m', '1h', '6h', '90s'])).toBe('43200s'); + expect(resolveBucketSpanInSeconds(['15m', '1h', '6h', '90s'])).toBe(43200); }); }); From 61764c5ba3ea89afd5aecfb7fde636f7a6da923c Mon Sep 17 00:00:00 2001 From: Dima Arnautov Date: Wed, 17 Feb 2021 17:04:06 +0100 Subject: [PATCH 6/6] [ML] include detector_index into source --- x-pack/plugins/ml/server/lib/alerts/alerting_service.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugins/ml/server/lib/alerts/alerting_service.ts b/x-pack/plugins/ml/server/lib/alerts/alerting_service.ts index 45785c9b8f19..6e7cd77e450b 100644 --- a/x-pack/plugins/ml/server/lib/alerts/alerting_service.ts +++ b/x-pack/plugins/ml/server/lib/alerts/alerting_service.ts @@ -190,6 +190,7 @@ export function alertingServiceProvider(mlClient: MlClient, esClient: Elasticsea 'partition_field_name', 'partition_field_value', 'job_id', + 'detector_index', ], }, size: 3,