Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Prevent duplicate notifications about the same anomaly result #91485

Merged
merged 6 commits into from
Feb 17, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions x-pack/plugins/ml/common/types/alerts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export type TopHitsResultsKeys = 'top_record_hits' | 'top_bucket_hits' | 'top_in
export interface AlertExecutionResult {
count: number;
key: number;
key_as_string: string;
alertInstanceKey: string;
isInterim: boolean;
jobIds: string[];
timestamp: number;
Expand Down Expand Up @@ -47,10 +47,13 @@ interface BaseAnomalyAlertDoc {
export interface RecordAnomalyAlertDoc extends BaseAnomalyAlertDoc {
result_type: typeof ANOMALY_RESULT_TYPE.RECORD;
function: string;
field_name: string;
by_field_value: string | number;
over_field_value: string | number;
partition_field_value: string | number;
field_name?: string;
by_field_name?: string;
by_field_value?: string | number;
over_field_name?: string;
over_field_value?: string | number;
partition_field_name?: string;
partition_field_value?: string | number;
}

export interface BucketAnomalyAlertDoc extends BaseAnomalyAlertDoc {
Expand Down
109 changes: 97 additions & 12 deletions x-pack/plugins/ml/server/lib/alerts/alerting_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import Boom from '@hapi/boom';
import rison from 'rison-node';
import { ElasticsearchClient } from 'kibana/server';
import { MlClient } from '../ml_client';
import {
MlAnomalyDetectionAlertParams,
Expand All @@ -25,6 +26,7 @@ import {
import { parseInterval } from '../../../common/util/parse_interval';
import { AnomalyDetectionAlertContext } from './register_anomaly_detection_alert_type';
import { MlJobsResponse } from '../../../common/types/job_service';
import { ANOMALY_SCORE_MATCH_GROUP_ID } from '../../../common/constants/alerts';

function isDefined<T>(argument: T | undefined | null): argument is T {
return argument !== undefined && argument !== null;
Expand All @@ -48,8 +50,9 @@ export function resolveTimeInterval(bucketSpans: string[]): string {
/**
* Alerting related server-side methods
* @param mlClient
* @param esClient
*/
export function alertingServiceProvider(mlClient: MlClient) {
export function alertingServiceProvider(mlClient: MlClient, esClient: ElasticsearchClient) {
const getAggResultsLabel = (resultType: AnomalyResultType) => {
return {
aggGroupLabel: `${resultType}_results` as PreviewResultsKeys,
Expand Down Expand Up @@ -177,8 +180,11 @@ export function alertingServiceProvider(mlClient: MlClient) {
'is_interim',
'function',
'field_name',
'by_field_name',
'by_field_value',
'over_field_name',
'over_field_value',
'partition_field_name',
'partition_field_value',
'job_id',
],
Expand Down Expand Up @@ -257,6 +263,26 @@ export function alertingServiceProvider(mlClient: MlClient) {
};
};

/**
* Provides unique key for the anomaly result.
*/
const getAlertInstanceKey = (source: any): string => {
let alertInstanceKey = `${source.job_id}_${source.timestamp}`;
if (source.result_type === ANOMALY_RESULT_TYPE.INFLUENCER) {
alertInstanceKey += `_${source.influencer_field_name}_${source.influencer_field_value}`;
} else if (source.result_type === ANOMALY_RESULT_TYPE.RECORD) {
const fieldName =
source.field_name ??
peteharverson marked this conversation as resolved.
Show resolved Hide resolved
source.by_field_name ??
source.over_field_name ??
source.partition_field_name;
const fieldValue =
source.by_field_value ?? source.over_field_value ?? source.partition_field_value;
alertInstanceKey += `_${source.function}_${fieldName}_${fieldValue}`;
}
return alertInstanceKey;
};

/**
* Builds a request body
* @param params
Expand Down Expand Up @@ -370,19 +396,22 @@ export function alertingServiceProvider(mlClient: MlClient) {
const aggTypeResults = v[resultsLabel.aggGroupLabel];
const requestedAnomalies = aggTypeResults[resultsLabel.topHitsLabel].hits.hits;

const topAnomaly = requestedAnomalies[0];
const alertInstanceKey = getAlertInstanceKey(topAnomaly._source);

return {
count: aggTypeResults.doc_count,
key: v.key,
key_as_string: v.key_as_string,
alertInstanceKey,
jobIds: [...new Set(requestedAnomalies.map((h) => h._source.job_id))],
isInterim: requestedAnomalies.some((h) => h._source.is_interim),
timestamp: requestedAnomalies[0]._source.timestamp,
timestampIso8601: requestedAnomalies[0].fields.timestamp_iso8601[0],
timestampEpoch: requestedAnomalies[0].fields.timestamp_epoch[0],
score: requestedAnomalies[0].fields.score[0],
timestamp: topAnomaly._source.timestamp,
timestampIso8601: topAnomaly.fields.timestamp_iso8601[0],
timestampEpoch: topAnomaly.fields.timestamp_epoch[0],
score: topAnomaly.fields.score[0],
bucketRange: {
start: requestedAnomalies[0].fields.start[0],
end: requestedAnomalies[0].fields.end[0],
start: topAnomaly.fields.start[0],
end: topAnomaly.fields.end[0],
},
topRecords: v.record_results.top_record_hits.hits.hits.map((h) => ({
...h._source,
Expand Down Expand Up @@ -479,11 +508,14 @@ export function alertingServiceProvider(mlClient: MlClient) {
/**
* Return the result of an alert condition execution.
*
* @param params
* @param params - Alert params
* @param publicBaseUrl
* @param alertId - Alert ID
*/
execute: async (
params: MlAnomalyDetectionAlertParams,
publicBaseUrl: string | undefined
publicBaseUrl: string | undefined,
alertId: string
): Promise<AnomalyDetectionAlertContext | undefined> => {
const res = await fetchAnomalies(params);

Expand All @@ -496,12 +528,65 @@ export function alertingServiceProvider(mlClient: MlClient) {

const anomalyExplorerUrl = buildExplorerUrl(result, params.resultType as AnomalyResultType);

return {
const executionResult = {
...result,
name: result.key_as_string,
name: result.alertInstanceKey,
anomalyExplorerUrl,
kibanaBaseUrl: publicBaseUrl!,
};

let kibanaEventLogCount = 0;
try {
// Check kibana-event-logs for presence of this alert instance
const kibanaLogResults = await esClient.count({
index: '.kibana-event-log-*',
body: {
query: {
bool: {
must: [
{
term: {
'kibana.alerting.action_group_id': {
value: ANOMALY_SCORE_MATCH_GROUP_ID,
},
},
},
{
term: {
'kibana.alerting.instance_id': {
value: executionResult.name,
},
},
},
{
nested: {
path: 'kibana.saved_objects',
query: {
term: {
'kibana.saved_objects.id': {
value: alertId,
},
},
},
},
},
],
},
},
},
});

kibanaEventLogCount = kibanaLogResults.body.count;
} catch (e) {
// eslint-disable-next-line no-console
console.log('Unable to check kibana event logs', e);
}

if (kibanaEventLogCount > 0) {
return;
}

return executionResult;
},
/**
* Checks how often the alert condition will fire an alert instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,13 @@ export function registerAnomalyDetectionAlertType({
},
producer: PLUGIN_ID,
minimumLicenseRequired: MINIMUM_FULL_LICENSE,
async executor({ services, params }) {
async executor({ services, params, alertId }) {
const fakeRequest = {} as KibanaRequest;
const { execute } = mlSharedServices.alertingServiceProvider(
services.savedObjectsClient,
fakeRequest
);
const executionResult = await execute(params, publicBaseUrl);
const executionResult = await execute(params, publicBaseUrl, alertId);

if (executionResult) {
const alertInstanceName = executionResult.name;
Expand Down
4 changes: 2 additions & 2 deletions x-pack/plugins/ml/server/routes/alerting.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ export function alertingRoutes({ router, routeGuard }: RouteInitialization) {
tags: ['access:ml:canGetJobs'],
},
},
routeGuard.fullLicenseAPIGuard(async ({ mlClient, request, response }) => {
routeGuard.fullLicenseAPIGuard(async ({ mlClient, request, response, client }) => {
try {
const alertingService = alertingServiceProvider(mlClient);
const alertingService = alertingServiceProvider(mlClient, client.asInternalUser);

const result = await alertingService.preview(request.body);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@ export function getAlertingServiceProvider(getGuards: GetGuards) {
return await getGuards(request, savedObjectsClient)
.isFullLicense()
.hasMlCapabilities(['canGetJobs'])
.ok(({ mlClient }) => alertingServiceProvider(mlClient).preview(...args));
.ok(({ mlClient, scopedClient }) =>
alertingServiceProvider(mlClient, scopedClient.asInternalUser).preview(...args)
);
},
execute: async (
...args: Parameters<MlAlertingService['execute']>
): ReturnType<MlAlertingService['execute']> => {
return await getGuards(request, savedObjectsClient)
.isFullLicense()
.hasMlCapabilities(['canGetJobs'])
.ok(({ mlClient }) => alertingServiceProvider(mlClient).execute(...args));
.ok(({ mlClient, scopedClient }) =>
alertingServiceProvider(mlClient, scopedClient.asInternalUser).execute(...args)
);
},
};
},
Expand Down