diff --git a/x-pack/plugins/ml/common/constants/alerts.ts b/x-pack/plugins/ml/common/constants/alerts.ts index 2192b2b504b59..1b373b2ec435b 100644 --- a/x-pack/plugins/ml/common/constants/alerts.ts +++ b/x-pack/plugins/ml/common/constants/alerts.ts @@ -54,12 +54,12 @@ export const HEALTH_CHECK_NAMES: Record { enabled: true, timeInterval: null, }, + errorMessages: { + enabled: true, + }, }); }); test('returns config with overridden values based on provided configuration', () => { @@ -119,6 +122,9 @@ describe('getResultJobsHealthRuleConfig', () => { enabled: true, timeInterval: null, }, + errorMessages: { + enabled: true, + }, }); }); }); diff --git a/x-pack/plugins/ml/common/util/alerts.ts b/x-pack/plugins/ml/common/util/alerts.ts index 7328c2a4dcc71..6abc5333a1f73 100644 --- a/x-pack/plugins/ml/common/util/alerts.ts +++ b/x-pack/plugins/ml/common/util/alerts.ts @@ -54,7 +54,7 @@ export function getTopNBuckets(job: Job): number { return Math.ceil(narrowBucketLength / bucketSpan.asSeconds()); } -const implementedTests = ['datafeed', 'mml', 'delayedData'] as JobsHealthTests[]; +const implementedTests = ['datafeed', 'mml', 'delayedData', 'errorMessages'] as JobsHealthTests[]; /** * Returns tests configuration combined with default values. diff --git a/x-pack/plugins/ml/public/alerting/jobs_health_rule/register_jobs_health_alerting_rule.ts b/x-pack/plugins/ml/public/alerting/jobs_health_rule/register_jobs_health_alerting_rule.ts index f6446b454a877..a5f433bcc3752 100644 --- a/x-pack/plugins/ml/public/alerting/jobs_health_rule/register_jobs_health_alerting_rule.ts +++ b/x-pack/plugins/ml/public/alerting/jobs_health_rule/register_jobs_health_alerting_rule.ts @@ -21,7 +21,8 @@ export function registerJobsHealthAlertingRule( triggersActionsUi.ruleTypeRegistry.register({ id: ML_ALERT_TYPES.AD_JOBS_HEALTH, description: i18n.translate('xpack.ml.alertTypes.jobsHealthAlertingRule.description', { - defaultMessage: 'Alert when anomaly detection jobs experience operational issues.', + defaultMessage: + 'Alert when anomaly detection jobs experience operational issues. Enable suitable alerts for critically important jobs.', }), iconClass: 'bell', documentationUrl(docLinks) { @@ -90,14 +91,15 @@ export function registerJobsHealthAlertingRule( \\{\\{context.message\\}\\} \\{\\{#context.results\\}\\} Job ID: \\{\\{job_id\\}\\} - \\{\\{#datafeed_id\\}\\}Datafeed ID: \\{\\{datafeed_id\\}\\} \\{\\{/datafeed_id\\}\\} - \\{\\{#datafeed_state\\}\\}Datafeed state: \\{\\{datafeed_state\\}\\} \\{\\{/datafeed_state\\}\\} - \\{\\{#memory_status\\}\\}Memory status: \\{\\{memory_status\\}\\} \\{\\{/memory_status\\}\\} - \\{\\{#log_time\\}\\}Memory logging time: \\{\\{log_time\\}\\} \\{\\{/log_time\\}\\} - \\{\\{#failed_category_count\\}\\}Failed category count: \\{\\{failed_category_count\\}\\} \\{\\{/failed_category_count\\}\\} - \\{\\{#annotation\\}\\}Annotation: \\{\\{annotation\\}\\} \\{\\{/annotation\\}\\} - \\{\\{#missed_docs_count\\}\\}Number of missed documents: \\{\\{missed_docs_count\\}\\} \\{\\{/missed_docs_count\\}\\} - \\{\\{#end_timestamp\\}\\}Latest finalized bucket with missing docs: \\{\\{end_timestamp\\}\\} \\{\\{/end_timestamp\\}\\} + \\{\\{#datafeed_id\\}\\}Datafeed ID: \\{\\{datafeed_id\\}\\} + \\{\\{/datafeed_id\\}\\} \\{\\{#datafeed_state\\}\\}Datafeed state: \\{\\{datafeed_state\\}\\} + \\{\\{/datafeed_state\\}\\} \\{\\{#memory_status\\}\\}Memory status: \\{\\{memory_status\\}\\} + \\{\\{/memory_status\\}\\} \\{\\{#log_time\\}\\}Memory logging time: \\{\\{log_time\\}\\} + \\{\\{/log_time\\}\\} \\{\\{#failed_category_count\\}\\}Failed category count: \\{\\{failed_category_count\\}\\} + \\{\\{/failed_category_count\\}\\} \\{\\{#annotation\\}\\}Annotation: \\{\\{annotation\\}\\} + \\{\\{/annotation\\}\\} \\{\\{#missed_docs_count\\}\\}Number of missed documents: \\{\\{missed_docs_count\\}\\} + \\{\\{/missed_docs_count\\}\\} \\{\\{#end_timestamp\\}\\}Latest finalized bucket with missing docs: \\{\\{end_timestamp\\}\\} + \\{\\{/end_timestamp\\}\\} \\{\\{#errors\\}\\}Error message: \\{\\{message\\}\\} \\{\\{/errors\\}\\} \\{\\{/context.results\\}\\} `, } diff --git a/x-pack/plugins/ml/server/lib/alerts/jobs_health_service.test.ts b/x-pack/plugins/ml/server/lib/alerts/jobs_health_service.test.ts index b345cf8c1245c..ffaa26fc949ee 100644 --- a/x-pack/plugins/ml/server/lib/alerts/jobs_health_service.test.ts +++ b/x-pack/plugins/ml/server/lib/alerts/jobs_health_service.test.ts @@ -11,9 +11,39 @@ import type { Logger } from 'kibana/server'; import { MlClient } from '../ml_client'; import { MlJob, MlJobStats } from '@elastic/elasticsearch/api/types'; import { AnnotationService } from '../../models/annotation_service/annotation'; +import { JobsHealthExecutorOptions } from './register_jobs_monitoring_rule_type'; +import { JobAuditMessagesService } from '../../models/job_audit_messages/job_audit_messages'; +import { DeepPartial } from '../../../common/types/common'; const MOCK_DATE_NOW = 1487076708000; +function getDefaultExecutorOptions( + overrides: DeepPartial = {} +): JobsHealthExecutorOptions { + return ({ + state: {}, + startedAt: new Date('2021-08-12T13:13:39.396Z'), + previousStartedAt: new Date('2021-08-12T13:13:27.396Z'), + spaceId: 'default', + namespace: undefined, + name: 'ml-health-check', + tags: [], + createdBy: 'elastic', + updatedBy: 'elastic', + rule: { + name: 'ml-health-check', + tags: [], + consumer: 'alerts', + producer: 'ml', + ruleTypeId: 'xpack.ml.anomaly_detection_jobs_health', + ruleTypeName: 'Anomaly detection jobs health', + enabled: true, + schedule: { interval: '10s' }, + }, + ...overrides, + } as unknown) as JobsHealthExecutorOptions; +} + describe('JobsHealthService', () => { const mlClient = ({ getJobs: jest.fn().mockImplementation(({ job_id: jobIds = [] }) => { @@ -117,6 +147,12 @@ describe('JobsHealthService', () => { }), } as unknown) as jest.Mocked; + const jobAuditMessagesService = ({ + getJobsErrors: jest.fn().mockImplementation((jobIds: string) => { + return Promise.resolve({}); + }), + } as unknown) as jest.Mocked; + const logger = ({ warn: jest.fn(), info: jest.fn(), @@ -127,6 +163,7 @@ describe('JobsHealthService', () => { mlClient, datafeedsService, annotationService, + jobAuditMessagesService, logger ); @@ -143,42 +180,52 @@ describe('JobsHealthService', () => { test('returns empty results when no jobs provided', async () => { // act - const executionResult = await jobHealthService.getTestsResults('testRule', { - testsConfig: null, - includeJobs: { - jobIds: ['*'], - groupIds: [], - }, - excludeJobs: null, - }); + const executionResult = await jobHealthService.getTestsResults( + getDefaultExecutorOptions({ + rule: { name: 'testRule' }, + params: { + testsConfig: null, + includeJobs: { + jobIds: ['*'], + groupIds: [], + }, + excludeJobs: null, + }, + }) + ); expect(logger.warn).toHaveBeenCalledWith('Rule "testRule" does not have associated jobs.'); expect(datafeedsService.getDatafeedByJobId).not.toHaveBeenCalled(); expect(executionResult).toEqual([]); }); test('returns empty results and does not perform datafeed check when test is disabled', async () => { - const executionResult = await jobHealthService.getTestsResults('testRule', { - testsConfig: { - datafeed: { - enabled: false, - }, - behindRealtime: null, - delayedData: { - enabled: false, - docsCount: null, - timeInterval: null, - }, - errorMessages: null, - mml: { - enabled: false, + const executionResult = await jobHealthService.getTestsResults( + getDefaultExecutorOptions({ + rule: { name: 'testRule' }, + params: { + testsConfig: { + datafeed: { + enabled: false, + }, + behindRealtime: null, + delayedData: { + enabled: false, + docsCount: null, + timeInterval: null, + }, + errorMessages: null, + mml: { + enabled: false, + }, + }, + includeJobs: { + jobIds: ['test_job_01'], + groupIds: [], + }, + excludeJobs: null, }, - }, - includeJobs: { - jobIds: ['test_job_01'], - groupIds: [], - }, - excludeJobs: null, - }); + }) + ); expect(logger.warn).not.toHaveBeenCalled(); expect(logger.debug).toHaveBeenCalledWith(`Performing health checks for job IDs: test_job_01`); expect(datafeedsService.getDatafeedByJobId).not.toHaveBeenCalled(); @@ -186,27 +233,32 @@ describe('JobsHealthService', () => { }); test('takes into account delayed data params', async () => { - const executionResult = await jobHealthService.getTestsResults('testRule_04', { - testsConfig: { - delayedData: { - enabled: true, - docsCount: 10, - timeInterval: '4h', + const executionResult = await jobHealthService.getTestsResults( + getDefaultExecutorOptions({ + rule: { name: 'testRule_04' }, + params: { + testsConfig: { + delayedData: { + enabled: true, + docsCount: 10, + timeInterval: '4h', + }, + behindRealtime: { enabled: false, timeInterval: null }, + mml: { enabled: false }, + datafeed: { enabled: false }, + errorMessages: { enabled: false }, + }, + includeJobs: { + jobIds: [], + groupIds: ['test_group'], + }, + excludeJobs: { + jobIds: ['test_job_03'], + groupIds: [], + }, }, - behindRealtime: { enabled: false, timeInterval: null }, - mml: { enabled: false }, - datafeed: { enabled: false }, - errorMessages: { enabled: false }, - }, - includeJobs: { - jobIds: [], - groupIds: ['test_group'], - }, - excludeJobs: { - jobIds: ['test_job_03'], - groupIds: [], - }, - }); + }) + ); expect(annotationService.getDelayedDataAnnotations).toHaveBeenCalledWith({ jobIds: ['test_job_01', 'test_job_02'], @@ -234,17 +286,22 @@ describe('JobsHealthService', () => { }); test('returns results based on provided selection', async () => { - const executionResult = await jobHealthService.getTestsResults('testRule_03', { - testsConfig: null, - includeJobs: { - jobIds: [], - groupIds: ['test_group'], - }, - excludeJobs: { - jobIds: ['test_job_03'], - groupIds: [], - }, - }); + const executionResult = await jobHealthService.getTestsResults( + getDefaultExecutorOptions({ + rule: { name: 'testRule_03' }, + params: { + testsConfig: null, + includeJobs: { + jobIds: [], + groupIds: ['test_group'], + }, + excludeJobs: { + jobIds: ['test_job_03'], + groupIds: [], + }, + }, + }) + ); expect(logger.warn).not.toHaveBeenCalled(); expect(logger.debug).toHaveBeenCalledWith( `Performing health checks for job IDs: test_job_01, test_job_02` diff --git a/x-pack/plugins/ml/server/lib/alerts/jobs_health_service.ts b/x-pack/plugins/ml/server/lib/alerts/jobs_health_service.ts index 52e17fed7a414..bcae57e558573 100644 --- a/x-pack/plugins/ml/server/lib/alerts/jobs_health_service.ts +++ b/x-pack/plugins/ml/server/lib/alerts/jobs_health_service.ts @@ -11,10 +11,7 @@ import { i18n } from '@kbn/i18n'; import { Logger } from 'kibana/server'; import { MlJob } from '@elastic/elasticsearch/api/types'; import { MlClient } from '../ml_client'; -import { - AnomalyDetectionJobsHealthRuleParams, - JobSelection, -} from '../../routes/schemas/alerting_schema'; +import { JobSelection } from '../../routes/schemas/alerting_schema'; import { datafeedsProvider, DatafeedsService } from '../../models/job_service/datafeeds'; import { ALL_JOBS_SELECTION, HEALTH_CHECK_NAMES } from '../../../common/constants/alerts'; import { DatafeedStats } from '../../../common/types/anomaly_detection_jobs'; @@ -22,6 +19,7 @@ import { GetGuards } from '../../shared_services/shared_services'; import { AnomalyDetectionJobsHealthAlertContext, DelayedDataResponse, + JobsHealthExecutorOptions, MmlTestResponse, NotStartedDatafeedResponse, } from './register_jobs_monitoring_rule_type'; @@ -33,6 +31,10 @@ import { AnnotationService } from '../../models/annotation_service/annotation'; import { annotationServiceProvider } from '../../models/annotation_service'; import { parseInterval } from '../../../common/util/parse_interval'; import { isDefined } from '../../../common/types/guards'; +import { + jobAuditMessagesProvider, + JobAuditMessagesService, +} from '../../models/job_audit_messages/job_audit_messages'; interface TestResult { name: string; @@ -45,6 +47,7 @@ export function jobsHealthServiceProvider( mlClient: MlClient, datafeedsService: DatafeedsService, annotationService: AnnotationService, + jobAuditMessagesService: JobAuditMessagesService, logger: Logger ) { /** @@ -236,13 +239,25 @@ export function jobsHealthServiceProvider( return annotations; }, + /** + * Retrieves a list of the latest errors per jobs. + * @param jobIds List of job IDs. + * @param previousStartedAt Time of the previous rule execution. As we intend to notify + * about an error only once, limit the scope of the errors search. + */ + async getErrorsReport(jobIds: string[], previousStartedAt: Date) { + return await jobAuditMessagesService.getJobsErrors(jobIds, previousStartedAt.getTime()); + }, /** * Retrieves report grouped by test. */ - async getTestsResults( - ruleInstanceName: string, - { testsConfig, includeJobs, excludeJobs }: AnomalyDetectionJobsHealthRuleParams - ): Promise { + async getTestsResults(executorOptions: JobsHealthExecutorOptions): Promise { + const { + rule, + previousStartedAt, + params: { testsConfig, includeJobs, excludeJobs }, + } = executorOptions; + const config = getResultJobsHealthRuleConfig(testsConfig); const results: TestsResults = []; @@ -251,7 +266,7 @@ export function jobsHealthServiceProvider( const jobIds = getJobIds(jobs); if (jobIds.length === 0) { - logger.warn(`Rule "${ruleInstanceName}" does not have associated jobs.`); + logger.warn(`Rule "${rule.name}" does not have associated jobs.`); return results; } @@ -334,6 +349,26 @@ export function jobsHealthServiceProvider( } } + if (config.errorMessages.enabled && previousStartedAt) { + const response = await this.getErrorsReport(jobIds, previousStartedAt); + if (response.length > 0) { + results.push({ + name: HEALTH_CHECK_NAMES.errorMessages.name, + context: { + results: response, + message: i18n.translate( + 'xpack.ml.alertTypes.jobsHealthAlertingRule.errorMessagesMessage', + { + defaultMessage: + '{jobsCount, plural, one {# job contains} other {# jobs contain}} errors in the messages.', + values: { jobsCount: response.length }, + } + ), + }, + }); + } + } + return results; }, }; @@ -360,6 +395,7 @@ export function getJobsHealthServiceProvider(getGuards: GetGuards) { mlClient, datafeedsProvider(scopedClient, mlClient), annotationServiceProvider(scopedClient), + jobAuditMessagesProvider(scopedClient, mlClient), logger ).getTestsResults(...args) ); diff --git a/x-pack/plugins/ml/server/lib/alerts/register_jobs_monitoring_rule_type.ts b/x-pack/plugins/ml/server/lib/alerts/register_jobs_monitoring_rule_type.ts index 063d8ad5a8980..c49c169d3bd21 100644 --- a/x-pack/plugins/ml/server/lib/alerts/register_jobs_monitoring_rule_type.ts +++ b/x-pack/plugins/ml/server/lib/alerts/register_jobs_monitoring_rule_type.ts @@ -22,6 +22,8 @@ import { AlertInstanceState, AlertTypeState, } from '../../../../alerting/common'; +import { JobsErrorsResponse } from '../../models/job_audit_messages/job_audit_messages'; +import { AlertExecutorOptions } from '../../../../alerting/server'; type ModelSizeStats = MlJobStats['model_size_stats']; @@ -55,7 +57,8 @@ export interface DelayedDataResponse { export type AnomalyDetectionJobHealthResult = | MmlTestResponse | NotStartedDatafeedResponse - | DelayedDataResponse; + | DelayedDataResponse + | JobsErrorsResponse[number]; export type AnomalyDetectionJobsHealthAlertContext = { results: AnomalyDetectionJobHealthResult[]; @@ -69,10 +72,18 @@ export type AnomalyDetectionJobRealtimeIssue = typeof ANOMALY_DETECTION_JOB_REAL export const REALTIME_ISSUE_DETECTED: ActionGroup = { id: ANOMALY_DETECTION_JOB_REALTIME_ISSUE, name: i18n.translate('xpack.ml.jobsHealthAlertingRule.actionGroupName', { - defaultMessage: 'Real-time issue detected', + defaultMessage: 'Issue detected', }), }; +export type JobsHealthExecutorOptions = AlertExecutorOptions< + AnomalyDetectionJobsHealthRuleParams, + Record, + Record, + AnomalyDetectionJobsHealthAlertContext, + AnomalyDetectionJobRealtimeIssue +>; + export function registerJobsMonitoringRuleType({ alerting, mlServicesProviders, @@ -120,14 +131,16 @@ export function registerJobsMonitoringRuleType({ producer: PLUGIN_ID, minimumLicenseRequired: MINIMUM_FULL_LICENSE, isExportable: true, - async executor({ services, params, alertId, state, previousStartedAt, startedAt, name, rule }) { + async executor(options) { + const { services, name } = options; + const fakeRequest = {} as KibanaRequest; const { getTestsResults } = mlServicesProviders.jobsHealthServiceProvider( services.savedObjectsClient, fakeRequest, logger ); - const executionResult = await getTestsResults(name, params); + const executionResult = await getTestsResults(options); if (executionResult.length > 0) { logger.info( diff --git a/x-pack/plugins/ml/server/models/job_audit_messages/job_audit_messages.ts b/x-pack/plugins/ml/server/models/job_audit_messages/job_audit_messages.ts index 98ed76319a0f7..fcda1a2a3ea73 100644 --- a/x-pack/plugins/ml/server/models/job_audit_messages/job_audit_messages.ts +++ b/x-pack/plugins/ml/server/models/job_audit_messages/job_audit_messages.ts @@ -54,6 +54,10 @@ export function isClearable(index?: string): boolean { return false; } +export type JobsErrorsResponse = Array<{ job_id: string; errors: JobMessage[] }>; + +export type JobAuditMessagesService = ReturnType; + export function jobAuditMessagesProvider( { asInternalUser }: IScopedClusterClient, mlClient: MlClient @@ -178,7 +182,10 @@ export function jobAuditMessagesProvider( return { messages, notificationIndices }; } - // search highest, most recent audit messages for all jobs for the last 24hrs. + /** + * Search highest, most recent audit messages for all jobs for the last 24hrs. + * @param jobIds + */ async function getAuditMessagesSummary(jobIds: string[]): Promise { // TODO This is the current default value of the cluster setting `search.max_buckets`. // This should possibly consider the real settings in a future update. @@ -400,9 +407,70 @@ export function jobAuditMessagesProvider( return (Object.keys(LEVEL) as LevelName[])[Object.values(LEVEL).indexOf(level)]; } + /** + * Retrieve list of errors per job. + * @param jobIds + */ + async function getJobsErrors(jobIds: string[], earliestMs?: number): Promise { + const { body } = await asInternalUser.search({ + index: ML_NOTIFICATION_INDEX_PATTERN, + ignore_unavailable: true, + size: 0, + body: { + query: { + bool: { + filter: [ + ...(earliestMs ? [{ range: { timestamp: { gte: earliestMs } } }] : []), + { terms: { job_id: jobIds } }, + { + term: { level: { value: MESSAGE_LEVEL.ERROR } }, + }, + ], + }, + }, + aggs: { + by_job: { + terms: { + field: 'job_id', + size: jobIds.length, + }, + aggs: { + latest_errors: { + top_hits: { + size: 10, + sort: [ + { + timestamp: { + order: 'desc', + }, + }, + ], + }, + }, + }, + }, + }, + }, + }); + + const errors = body.aggregations!.by_job as estypes.AggregationsTermsAggregate<{ + key: string; + doc_count: number; + latest_errors: Pick, 'hits'>; + }>; + + return errors.buckets.map((bucket) => { + return { + job_id: bucket.key, + errors: bucket.latest_errors.hits.hits.map((v) => v._source!), + }; + }); + } + return { getJobAuditMessages, getAuditMessagesSummary, clearJobAuditMessages, + getJobsErrors, }; }