Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Fix date formatting in the alert context of the Anomaly detection job health rule type #109073

Merged
merged 12 commits into from
Aug 19, 2021
7 changes: 5 additions & 2 deletions x-pack/plugins/ml/common/types/kibana.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@

// custom edits or fixes for default kibana types which are incomplete

import { SimpleSavedObject } from 'kibana/public';
import { IndexPatternAttributes } from 'src/plugins/data/common';
import type { SimpleSavedObject } from 'kibana/public';
import type { IndexPatternAttributes } from 'src/plugins/data/common';
import type { FieldFormatsRegistry } from '../../../../../src/plugins/field_formats/common';

export type IndexPatternTitle = string;

Expand All @@ -26,3 +27,5 @@ export function isSavedSearchSavedObject(
): ss is SavedSearchSavedObject {
return ss !== null;
}

export type FieldFormatsRegistryProvider = () => Promise<FieldFormatsRegistry>;
3 changes: 2 additions & 1 deletion x-pack/plugins/ml/kibana.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
"uiActions",
"kibanaLegacy",
"discover",
"triggersActionsUi"
"triggersActionsUi",
"fieldFormats"
],
"optionalPlugins": [
"alerting",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ export function registerJobsHealthAlertingRule(
defaultActionMessage: i18n.translate(
'xpack.ml.alertTypes.jobsHealthAlertingRule.defaultActionMessage',
{
defaultMessage: `Anomaly detection jobs health check result:
defaultMessage: `[\\{\\{rule.name\\}\\}] Anomaly detection jobs health check result:
\\{\\{context.message\\}\\}
\\{\\{#context.results\\}\\}
Job ID: \\{\\{job_id\\}\\}
Expand Down
16 changes: 11 additions & 5 deletions x-pack/plugins/ml/server/lib/alerts/jobs_health_service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { AnnotationService } from '../../models/annotation_service/annotation';
import { JobsHealthExecutorOptions } from './register_jobs_monitoring_rule_type';
import { JobAuditMessagesService } from '../../models/job_audit_messages/job_audit_messages';
import { DeepPartial } from '../../../common/types/common';
import { FieldFormatsRegistryProvider } from '../../../common/types/kibana';

const MOCK_DATE_NOW = 1487076708000;

Expand Down Expand Up @@ -148,7 +149,7 @@ describe('JobsHealthService', () => {
} as unknown) as jest.Mocked<AnnotationService>;

const jobAuditMessagesService = ({
getJobsErrors: jest.fn().mockImplementation((jobIds: string) => {
getJobsErrorMessages: jest.fn().mockImplementation((jobIds: string) => {
return Promise.resolve({});
}),
} as unknown) as jest.Mocked<JobAuditMessagesService>;
Expand All @@ -159,11 +160,16 @@ describe('JobsHealthService', () => {
debug: jest.fn(),
} as unknown) as jest.Mocked<Logger>;

const getFieldsFormatRegistry = jest.fn().mockImplementation(() => {
return Promise.resolve({});
}) as jest.Mocked<FieldFormatsRegistryProvider>;

const jobHealthService: JobsHealthService = jobsHealthServiceProvider(
mlClient,
datafeedsService,
annotationService,
jobAuditMessagesService,
getFieldsFormatRegistry,
logger
);

Expand Down Expand Up @@ -279,7 +285,7 @@ describe('JobsHealthService', () => {
missed_docs_count: 11,
},
],
message: '1 job is suffering from delayed data.',
message: 'Job test_job_01 is suffering from delayed data.',
},
},
]);
Expand Down Expand Up @@ -333,7 +339,7 @@ describe('JobsHealthService', () => {
datafeed_state: 'stopped',
},
],
message: 'Datafeed is not started for the following jobs:',
message: 'Datafeed is not started for job test_job_02',
},
},
{
Expand All @@ -347,7 +353,7 @@ describe('JobsHealthService', () => {
},
],
message:
'1 job reached the hard model memory limit. Assign the job more memory and restore from a snapshot from prior to reaching the hard limit.',
'Job test_job_01 reached the hard model memory limit. Assign the job more memory and restore from a snapshot from prior to reaching the hard limit.',
},
},
{
Expand All @@ -369,7 +375,7 @@ describe('JobsHealthService', () => {
missed_docs_count: 8,
},
],
message: '2 jobs are suffering from delayed data.',
message: 'Jobs test_job_01, test_job_02 are suffering from delayed data.',
},
},
]);
Expand Down
97 changes: 73 additions & 24 deletions x-pack/plugins/ml/server/lib/alerts/jobs_health_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
* 2.0.
*/

import { memoize, keyBy } from 'lodash';
import { KibanaRequest, SavedObjectsClientContract } from 'kibana/server';
import { groupBy, keyBy, memoize } from 'lodash';
import { KibanaRequest, Logger, SavedObjectsClientContract } from 'kibana/server';
import { i18n } from '@kbn/i18n';
import { Logger } from 'kibana/server';
import { MlJob } from '@elastic/elasticsearch/api/types';
import { MlClient } from '../ml_client';
import { JobSelection } from '../../routes/schemas/alerting_schema';
Expand All @@ -19,6 +18,7 @@ import { GetGuards } from '../../shared_services/shared_services';
import {
AnomalyDetectionJobsHealthAlertContext,
DelayedDataResponse,
JobsErrorsResponse,
JobsHealthExecutorOptions,
MmlTestResponse,
NotStartedDatafeedResponse,
Expand All @@ -35,6 +35,7 @@ import {
jobAuditMessagesProvider,
JobAuditMessagesService,
} from '../../models/job_audit_messages/job_audit_messages';
import type { FieldFormatsRegistryProvider } from '../../../common/types/kibana';

interface TestResult {
name: string;
Expand All @@ -48,8 +49,18 @@ export function jobsHealthServiceProvider(
datafeedsService: DatafeedsService,
annotationService: AnnotationService,
jobAuditMessagesService: JobAuditMessagesService,
getFieldsFormatRegistry: FieldFormatsRegistryProvider,
logger: Logger
) {
/**
* Provides a callback for date formatting based on the Kibana settings.
*/
const getDateFormatter = memoize(async () => {
const fieldFormatsRegistry = await getFieldsFormatRegistry();
const dateFormatter = fieldFormatsRegistry.deserialize({ id: 'date' });
return dateFormatter.convert.bind(dateFormatter);
});

/**
* Extracts result list of jobs based on included and excluded selection of jobs and groups.
* @param includeJobs
Expand Down Expand Up @@ -121,6 +132,15 @@ export function jobsHealthServiceProvider(
async (jobIds: string[]) => (await mlClient.getJobStats({ job_id: jobIds.join(',') })).body.jobs
);

/** Gets values for translation string */
const getJobsAlertingMessageValues = <T extends Array<{ job_id: string }>>(results: T) => {
const jobIds = results.map((v) => v.job_id);
return {
count: jobIds.length,
jobsString: jobIds.join(', '),
};
};

return {
/**
* Gets not started datafeeds for opened jobs.
Expand Down Expand Up @@ -164,13 +184,15 @@ export function jobsHealthServiceProvider(
async getMmlReport(jobIds: string[]): Promise<MmlTestResponse[]> {
const jobsStats = await getJobStats(jobIds);

const dateFormatter = await getDateFormatter();

return jobsStats
.filter((j) => j.state === 'opened' && j.model_size_stats.memory_status !== 'ok')
.map(({ job_id: jobId, model_size_stats: modelSizeStats }) => {
return {
job_id: jobId,
memory_status: modelSizeStats.memory_status,
log_time: modelSizeStats.log_time,
log_time: dateFormatter(modelSizeStats.log_time),
model_bytes: modelSizeStats.model_bytes,
model_bytes_memory_limit: modelSizeStats.model_bytes_memory_limit,
peak_model_bytes: modelSizeStats.peak_model_bytes,
Expand Down Expand Up @@ -203,13 +225,15 @@ export function jobsHealthServiceProvider(
const defaultLookbackInterval = resolveLookbackInterval(resultJobs, datafeeds!);
const earliestMs = getDelayedDataLookbackTimestamp(timeInterval, defaultLookbackInterval);

const annotations: DelayedDataResponse[] = (
const getFormattedDate = await getDateFormatter();

return (
await annotationService.getDelayedDataAnnotations({
jobIds: resultJobIds,
earliestMs,
})
)
.map<DelayedDataResponse>((v) => {
.map((v) => {
const match = v.annotation.match(/Datafeed has missed (\d+)\s/);
const missedDocsCount = match ? parseInt(match[1], 10) : 0;
return {
Expand All @@ -235,18 +259,39 @@ export function jobsHealthServiceProvider(
v.end_timestamp > getDelayedDataLookbackTimestamp(timeInterval, jobLookbackInterval);

return isDocCountExceededThreshold && isEndTimestampWithinRange;
})
.map((v) => {
return {
...v,
end_timestamp: getFormattedDate(v.end_timestamp),
};
});

return annotations;
},
/**
* Retrieves a list of the latest errors per jobs.
* @param jobIds List of job IDs.
* @param previousStartedAt Time of the previous rule execution. As we intend to notify
* about an error only once, limit the scope of the errors search.
*/
async getErrorsReport(jobIds: string[], previousStartedAt: Date) {
return await jobAuditMessagesService.getJobsErrors(jobIds, previousStartedAt.getTime());
async getErrorsReport(
jobIds: string[],
previousStartedAt: Date
): Promise<JobsErrorsResponse[]> {
const getFormattedDate = await getDateFormatter();

return (
await jobAuditMessagesService.getJobsErrorMessages(jobIds, previousStartedAt.getTime())
).map((v) => {
return {
...v,
errors: v.errors.map((e) => {
return {
...e,
timestamp: getFormattedDate(e.timestamp),
};
}),
};
});
},
/**
* Retrieves report grouped by test.
Expand Down Expand Up @@ -282,7 +327,9 @@ export function jobsHealthServiceProvider(
message: i18n.translate(
'xpack.ml.alertTypes.jobsHealthAlertingRule.datafeedStateMessage',
{
defaultMessage: 'Datafeed is not started for the following jobs:',
defaultMessage:
'Datafeed is not started for {count, plural, one {job} other {jobs}} {jobsString}',
values: getJobsAlertingMessageValues(response),
}
),
},
Expand All @@ -293,30 +340,31 @@ export function jobsHealthServiceProvider(
if (config.mml.enabled) {
const response = await this.getMmlReport(jobIds);
if (response && response.length > 0) {
const hardLimitJobsCount = response.reduce((acc, curr) => {
return acc + (curr.memory_status === 'hard_limit' ? 1 : 0);
}, 0);
const { hard_limit: hardLimitJobs, soft_limit: softLimitJobs } = groupBy(
response,
'memory_status'
);

results.push({
name: HEALTH_CHECK_NAMES.mml.name,
context: {
results: response,
message:
hardLimitJobsCount > 0
hardLimitJobs.length > 0
? i18n.translate(
'xpack.ml.alertTypes.jobsHealthAlertingRule.mmlHardLimitMessage',
{
defaultMessage:
'{jobsCount, plural, one {# job} other {# jobs}} reached the hard model memory limit. Assign the job more memory and restore from a snapshot from prior to reaching the hard limit.',
values: { jobsCount: hardLimitJobsCount },
'{count, plural, one {Job} other {Jobs}} {jobsString} reached the hard model memory limit. Assign the job more memory and restore from a snapshot from prior to reaching the hard limit.',
values: getJobsAlertingMessageValues(hardLimitJobs),
}
)
: i18n.translate(
'xpack.ml.alertTypes.jobsHealthAlertingRule.mmlSoftLimitMessage',
{
defaultMessage:
'{jobsCount, plural, one {# job} other {# jobs}} reached the soft model memory limit. Assign the job more memory or edit the datafeed filter to limit scope of analysis.',
values: { jobsCount: response.length },
'{count, plural, one {Job} other {Jobs}} {jobsString} reached the soft model memory limit. Assign the job more memory or edit the datafeed filter to limit scope of analysis.',
values: getJobsAlertingMessageValues(softLimitJobs),
}
),
},
Expand All @@ -340,8 +388,8 @@ export function jobsHealthServiceProvider(
'xpack.ml.alertTypes.jobsHealthAlertingRule.delayedDataMessage',
{
defaultMessage:
'{jobsCount, plural, one {# job is} other {# jobs are}} suffering from delayed data.',
values: { jobsCount: response.length },
'{count, plural, one {Job} other {Jobs}} {jobsString} {count, plural, one {is} other {are}} suffering from delayed data.',
values: getJobsAlertingMessageValues(response),
}
),
},
Expand All @@ -360,8 +408,8 @@ export function jobsHealthServiceProvider(
'xpack.ml.alertTypes.jobsHealthAlertingRule.errorMessagesMessage',
{
defaultMessage:
'{jobsCount, plural, one {# job contains} other {# jobs contain}} errors in the messages.',
values: { jobsCount: response.length },
'{count, plural, one {Job} other {Jobs}} {jobsString} {count, plural, one {contains} other {contain}} errors in the messages.',
values: getJobsAlertingMessageValues(response),
}
),
},
Expand Down Expand Up @@ -390,12 +438,13 @@ export function getJobsHealthServiceProvider(getGuards: GetGuards) {
return await getGuards(request, savedObjectsClient)
.isFullLicense()
.hasMlCapabilities(['canGetJobs'])
.ok(({ mlClient, scopedClient }) =>
.ok(({ mlClient, scopedClient, getFieldsFormatRegistry }) =>
jobsHealthServiceProvider(
mlClient,
datafeedsProvider(scopedClient, mlClient),
annotationServiceProvider(scopedClient),
jobAuditMessagesProvider(scopedClient, mlClient),
getFieldsFormatRegistry,
logger
).getTestsResults(...args)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import {
AlertInstanceState,
AlertTypeState,
} from '../../../../alerting/common';
import { JobsErrorsResponse } from '../../models/job_audit_messages/job_audit_messages';
import { AlertExecutorOptions } from '../../../../alerting/server';
import type { AlertExecutorOptions } from '../../../../alerting/server';
import type { JobMessage } from '../../../common/types/audit_message';

type ModelSizeStats = MlJobStats['model_size_stats'];

Expand Down Expand Up @@ -51,14 +51,19 @@ export interface DelayedDataResponse {
/** Number of missed documents */
missed_docs_count: number;
/** Timestamp of the latest finalized bucket with missing docs */
end_timestamp: number;
end_timestamp: string;
}

export interface JobsErrorsResponse {
job_id: string;
errors: Array<Omit<JobMessage, 'timestamp'> & { timestamp: string }>;
}

export type AnomalyDetectionJobHealthResult =
| MmlTestResponse
| NotStartedDatafeedResponse
| DelayedDataResponse
| JobsErrorsResponse[number];
| JobsErrorsResponse;

export type AnomalyDetectionJobsHealthAlertContext = {
results: AnomalyDetectionJobHealthResult[];
Expand Down Expand Up @@ -143,7 +148,7 @@ export function registerJobsMonitoringRuleType({
const executionResult = await getTestsResults(options);

if (executionResult.length > 0) {
logger.info(
logger.debug(
`"${name}" rule is scheduling actions for tests: ${executionResult
.map((v) => v.name)
.join(', ')}`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,10 @@ export function jobAuditMessagesProvider(
* Retrieve list of errors per job.
* @param jobIds
*/
async function getJobsErrors(jobIds: string[], earliestMs?: number): Promise<JobsErrorsResponse> {
async function getJobsErrorMessages(
jobIds: string[],
earliestMs?: number
): Promise<JobsErrorsResponse> {
const { body } = await asInternalUser.search({
index: ML_NOTIFICATION_INDEX_PATTERN,
ignore_unavailable: true,
Expand Down Expand Up @@ -471,6 +474,6 @@ export function jobAuditMessagesProvider(
getJobAuditMessages,
getAuditMessagesSummary,
clearJobAuditMessages,
getJobsErrors,
getJobsErrorMessages,
};
}
Loading