Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Prevent duplicate notifications about the same anomaly result #91485

Merged
merged 6 commits into from
Feb 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions x-pack/plugins/ml/common/types/alerts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export type TopHitsResultsKeys = 'top_record_hits' | 'top_bucket_hits' | 'top_in
export interface AlertExecutionResult {
count: number;
key: number;
key_as_string: string;
alertInstanceKey: string;
isInterim: boolean;
jobIds: string[];
timestamp: number;
Expand Down Expand Up @@ -47,10 +47,13 @@ interface BaseAnomalyAlertDoc {
export interface RecordAnomalyAlertDoc extends BaseAnomalyAlertDoc {
result_type: typeof ANOMALY_RESULT_TYPE.RECORD;
function: string;
field_name: string;
by_field_value: string | number;
over_field_value: string | number;
partition_field_value: string | number;
field_name?: string;
by_field_name?: string;
by_field_value?: string | number;
over_field_name?: string;
over_field_value?: string | number;
partition_field_name?: string;
partition_field_value?: string | number;
}

export interface BucketAnomalyAlertDoc extends BaseAnomalyAlertDoc {
Expand Down
4 changes: 0 additions & 4 deletions x-pack/plugins/ml/common/util/anomaly_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,6 @@ export function getEntityFieldName(record: AnomalyRecordDoc): string | undefined
if (record.partition_field_name !== undefined) {
return record.partition_field_name;
}

return undefined;
}

// Returns the value of the field to use as the entity value from the source record
Expand All @@ -249,8 +247,6 @@ export function getEntityFieldValue(record: AnomalyRecordDoc): string | number |
if (record.partition_field_value !== undefined) {
return record.partition_field_value;
}

return undefined;
}

// Returns the list of partitioning entity fields for the source record as a list
Expand Down
4 changes: 2 additions & 2 deletions x-pack/plugins/ml/server/lib/alerts/alerting_service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
* 2.0.
*/

import { resolveTimeInterval } from './alerting_service';
import { resolveBucketSpanInSeconds } from './alerting_service';

describe('Alerting Service', () => {
test('should resolve maximum bucket interval', () => {
expect(resolveTimeInterval(['15m', '1h', '6h', '90s'])).toBe('43200s');
expect(resolveBucketSpanInSeconds(['15m', '1h', '6h', '90s'])).toBe(43200);
});
});
143 changes: 121 additions & 22 deletions x-pack/plugins/ml/server/lib/alerts/alerting_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

import Boom from '@hapi/boom';
import rison from 'rison-node';
import { ElasticsearchClient } from 'kibana/server';
import moment from 'moment';
import { Duration } from 'moment/moment';
import { MlClient } from '../ml_client';
import {
MlAnomalyDetectionAlertParams,
Expand All @@ -25,6 +28,8 @@ import {
import { parseInterval } from '../../../common/util/parse_interval';
import { AnomalyDetectionAlertContext } from './register_anomaly_detection_alert_type';
import { MlJobsResponse } from '../../../common/types/job_service';
import { ANOMALY_SCORE_MATCH_GROUP_ID } from '../../../common/constants/alerts';
import { getEntityFieldName, getEntityFieldValue } from '../../../common/util/anomaly_utils';

function isDefined<T>(argument: T | undefined | null): argument is T {
return argument !== undefined && argument !== null;
Expand All @@ -34,22 +39,23 @@ function isDefined<T>(argument: T | undefined | null): argument is T {
* Resolves the longest bucket span from the list and multiply it by 2.
* @param bucketSpans Collection of bucket spans
*/
export function resolveTimeInterval(bucketSpans: string[]): string {
return `${
export function resolveBucketSpanInSeconds(bucketSpans: string[]): number {
return (
Math.max(
...bucketSpans
.map((b) => parseInterval(b))
.filter(isDefined)
.map((v) => v.asSeconds())
) * 2
}s`;
);
}

/**
* Alerting related server-side methods
* @param mlClient
* @param esClient
*/
export function alertingServiceProvider(mlClient: MlClient) {
export function alertingServiceProvider(mlClient: MlClient, esClient: ElasticsearchClient) {
const getAggResultsLabel = (resultType: AnomalyResultType) => {
return {
aggGroupLabel: `${resultType}_results` as PreviewResultsKeys,
Expand Down Expand Up @@ -177,10 +183,14 @@ export function alertingServiceProvider(mlClient: MlClient) {
'is_interim',
'function',
'field_name',
'by_field_name',
'by_field_value',
'over_field_name',
'over_field_value',
'partition_field_name',
'partition_field_value',
'job_id',
'detector_index',
],
},
size: 3,
Expand Down Expand Up @@ -257,14 +267,31 @@ export function alertingServiceProvider(mlClient: MlClient) {
};
};

/**
* Provides unique key for the anomaly result.
*/
const getAlertInstanceKey = (source: any): string => {
let alertInstanceKey = `${source.job_id}_${source.timestamp}`;
if (source.result_type === ANOMALY_RESULT_TYPE.INFLUENCER) {
alertInstanceKey += `_${source.influencer_field_name}_${source.influencer_field_value}`;
} else if (source.result_type === ANOMALY_RESULT_TYPE.RECORD) {
const fieldName = getEntityFieldName(source);
const fieldValue = getEntityFieldValue(source);
alertInstanceKey += `_${source.detector_index}_${source.function}_${fieldName}_${fieldValue}`;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

source.detector_index is undefined in the key my test generated. Is this available in the source ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forgot to include it into the source, fixed in 61764c5

}
return alertInstanceKey;
};

/**
* Builds a request body
* @param params
* @param previewTimeInterval
* @param params - Alert params
* @param previewTimeInterval - Relative time interval to test the alert condition
* @param checkIntervalGap - Interval between alert executions
*/
const fetchAnomalies = async (
params: MlAnomalyDetectionAlertParams,
previewTimeInterval?: string
previewTimeInterval?: string,
checkIntervalGap?: Duration
): Promise<AlertExecutionResult[] | undefined> => {
const jobAndGroupIds = [
...(params.jobSelection.jobIds ?? []),
Expand All @@ -281,9 +308,14 @@ export function alertingServiceProvider(mlClient: MlClient) {
return;
}

const lookBackTimeInterval = resolveTimeInterval(
jobsResponse.map((v) => v.analysis_config.bucket_span)
);
/**
* The check interval might be bigger than the 2x bucket span.
* We need to check the biggest time range to make sure anomalies are not missed.
*/
const lookBackTimeInterval = `${Math.max(
resolveBucketSpanInSeconds(jobsResponse.map((v) => v.analysis_config.bucket_span)),
checkIntervalGap ? checkIntervalGap.asSeconds() : 0
)}s`;

const jobIds = jobsResponse.map((v) => v.job_id);

Expand Down Expand Up @@ -370,19 +402,22 @@ export function alertingServiceProvider(mlClient: MlClient) {
const aggTypeResults = v[resultsLabel.aggGroupLabel];
const requestedAnomalies = aggTypeResults[resultsLabel.topHitsLabel].hits.hits;

const topAnomaly = requestedAnomalies[0];
const alertInstanceKey = getAlertInstanceKey(topAnomaly._source);

return {
count: aggTypeResults.doc_count,
key: v.key,
key_as_string: v.key_as_string,
alertInstanceKey,
jobIds: [...new Set(requestedAnomalies.map((h) => h._source.job_id))],
isInterim: requestedAnomalies.some((h) => h._source.is_interim),
timestamp: requestedAnomalies[0]._source.timestamp,
timestampIso8601: requestedAnomalies[0].fields.timestamp_iso8601[0],
timestampEpoch: requestedAnomalies[0].fields.timestamp_epoch[0],
score: requestedAnomalies[0].fields.score[0],
timestamp: topAnomaly._source.timestamp,
timestampIso8601: topAnomaly.fields.timestamp_iso8601[0],
timestampEpoch: topAnomaly.fields.timestamp_epoch[0],
score: topAnomaly.fields.score[0],
bucketRange: {
start: requestedAnomalies[0].fields.start[0],
end: requestedAnomalies[0].fields.end[0],
start: topAnomaly.fields.start[0],
end: topAnomaly.fields.end[0],
},
topRecords: v.record_results.top_record_hits.hits.hits.map((h) => ({
...h._source,
Expand Down Expand Up @@ -479,13 +514,24 @@ export function alertingServiceProvider(mlClient: MlClient) {
/**
* Return the result of an alert condition execution.
*
* @param params
* @param params - Alert params
* @param publicBaseUrl
* @param alertId - Alert ID
* @param startedAt
* @param previousStartedAt
*/
execute: async (
params: MlAnomalyDetectionAlertParams,
publicBaseUrl: string | undefined
publicBaseUrl: string | undefined,
alertId: string,
startedAt: Date,
previousStartedAt: Date | null
): Promise<AnomalyDetectionAlertContext | undefined> => {
const res = await fetchAnomalies(params);
const checkIntervalGap = previousStartedAt
? moment.duration(moment(startedAt).diff(previousStartedAt))
: undefined;

const res = await fetchAnomalies(params, undefined, checkIntervalGap);

if (!res) {
throw new Error('No results found');
Expand All @@ -496,12 +542,65 @@ export function alertingServiceProvider(mlClient: MlClient) {

const anomalyExplorerUrl = buildExplorerUrl(result, params.resultType as AnomalyResultType);

return {
const executionResult = {
...result,
name: result.key_as_string,
name: result.alertInstanceKey,
anomalyExplorerUrl,
kibanaBaseUrl: publicBaseUrl!,
};

let kibanaEventLogCount = 0;
try {
// Check kibana-event-logs for presence of this alert instance
const kibanaLogResults = await esClient.count({
index: '.kibana-event-log-*',
body: {
query: {
bool: {
must: [
{
term: {
'kibana.alerting.action_group_id': {
value: ANOMALY_SCORE_MATCH_GROUP_ID,
},
},
},
{
term: {
'kibana.alerting.instance_id': {
value: executionResult.name,
},
},
},
{
nested: {
path: 'kibana.saved_objects',
query: {
term: {
'kibana.saved_objects.id': {
value: alertId,
},
},
},
},
},
],
},
},
},
});

kibanaEventLogCount = kibanaLogResults.body.count;
} catch (e) {
// eslint-disable-next-line no-console
console.log('Unable to check kibana event logs', e);
}

if (kibanaEventLogCount > 0) {
return;
}

return executionResult;
},
/**
* Checks how often the alert condition will fire an alert instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,19 @@ export function registerAnomalyDetectionAlertType({
},
producer: PLUGIN_ID,
minimumLicenseRequired: MINIMUM_FULL_LICENSE,
async executor({ services, params }) {
async executor({ services, params, alertId, state, previousStartedAt, startedAt }) {
const fakeRequest = {} as KibanaRequest;
const { execute } = mlSharedServices.alertingServiceProvider(
services.savedObjectsClient,
fakeRequest
);
const executionResult = await execute(params, publicBaseUrl);
const executionResult = await execute(
params,
publicBaseUrl,
alertId,
startedAt,
previousStartedAt
);

if (executionResult) {
const alertInstanceName = executionResult.name;
Expand Down
2 changes: 2 additions & 0 deletions x-pack/plugins/ml/server/lib/alerts/register_ml_alerts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
* 2.0.
*/

import { Logger } from 'kibana/server';
import { AlertingPlugin } from '../../../../alerts/server';
import { registerAnomalyDetectionAlertType } from './register_anomaly_detection_alert_type';
import { SharedServices } from '../../shared_services';

export interface RegisterAlertParams {
alerts: AlertingPlugin['setup'];
logger: Logger;
mlSharedServices: SharedServices;
publicBaseUrl: string | undefined;
}
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/ml/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ export class MlServerPlugin
if (plugins.alerts) {
registerMlAlerts({
alerts: plugins.alerts,
logger: this.log,
mlSharedServices: sharedServices,
publicBaseUrl: coreSetup.http.basePath.publicBaseUrl,
});
Expand Down
4 changes: 2 additions & 2 deletions x-pack/plugins/ml/server/routes/alerting.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ export function alertingRoutes({ router, routeGuard }: RouteInitialization) {
tags: ['access:ml:canGetJobs'],
},
},
routeGuard.fullLicenseAPIGuard(async ({ mlClient, request, response }) => {
routeGuard.fullLicenseAPIGuard(async ({ mlClient, request, response, client }) => {
try {
const alertingService = alertingServiceProvider(mlClient);
const alertingService = alertingServiceProvider(mlClient, client.asInternalUser);

const result = await alertingService.preview(request.body);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@ export function getAlertingServiceProvider(getGuards: GetGuards) {
return await getGuards(request, savedObjectsClient)
.isFullLicense()
.hasMlCapabilities(['canGetJobs'])
.ok(({ mlClient }) => alertingServiceProvider(mlClient).preview(...args));
.ok(({ mlClient, scopedClient }) =>
alertingServiceProvider(mlClient, scopedClient.asInternalUser).preview(...args)
);
},
execute: async (
...args: Parameters<MlAlertingService['execute']>
): ReturnType<MlAlertingService['execute']> => {
return await getGuards(request, savedObjectsClient)
.isFullLicense()
.hasMlCapabilities(['canGetJobs'])
.ok(({ mlClient }) => alertingServiceProvider(mlClient).execute(...args));
.ok(({ mlClient, scopedClient }) =>
alertingServiceProvider(mlClient, scopedClient.asInternalUser).execute(...args)
);
},
};
},
Expand Down