From 99b25508d94e29d76816f80a88fa15e2714b8b76 Mon Sep 17 00:00:00 2001 From: Ersin Erdal <92688503+ersin-erdal@users.noreply.github.com> Date: Thu, 19 Dec 2024 10:13:33 +0100 Subject: [PATCH] Reschedule a rule task when there is a cluster_block_exception (#201761) Resolves: https://github.com/elastic/response-ops-team/issues/249 This PR reschedules a rule (its task behind it) to be executed again in 1m in case of `cluster_block_exception` that will be thrown during a reindex. ## To verify: - Run your local Kibana, - Create a user with `kibana_system` and `kibana_admin` roles - Logout and login with your new user - Create an Always firing rule with 5m interval - Use below request to put a write block on the index used by the Always firing rule. `PUT /.internal.alerts-default.alerts-default-000001/_block/write` - There shouldn't be any error on the terminal - Check the task SO of the rule by using the below query, value of the `task.schedule.interval` should be 1m and the value of the `task.state` should remain the same. - Use below request on the Kibana console to remove write block. Every thing should come back to normal and alerts of the rule should be saved under `.internal.alerts-default.alerts-default-000001` index. ``` PUT /.kibana_task_manager_9.0.0_001/_settings { "index": { "blocks.write": false } } ``` --------- Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com> --- .../alerts_client/alerts_client.test.ts | 37 ++++++++++++++++++ .../server/alerts_client/alerts_client.ts | 23 +++++++++++ .../alerting/server/lib/error_with_type.ts | 39 +++++++++++++++++++ .../server/task_runner/task_runner.test.ts | 34 ++++++++++++++++ .../server/task_runner/task_runner.ts | 8 +++- 5 files changed, 140 insertions(+), 1 deletion(-) create mode 100644 x-pack/plugins/alerting/server/lib/error_with_type.ts diff --git a/x-pack/plugins/alerting/server/alerts_client/alerts_client.test.ts b/x-pack/plugins/alerting/server/alerts_client/alerts_client.test.ts index 903c8764d801f..557341f3e02de 100644 --- a/x-pack/plugins/alerting/server/alerts_client/alerts_client.test.ts +++ b/x-pack/plugins/alerting/server/alerts_client/alerts_client.test.ts @@ -1675,6 +1675,43 @@ describe('Alerts Client', () => { expect(clusterClient.bulk).not.toHaveBeenCalled(); expect(maintenanceWindowsService.getMaintenanceWindows).not.toHaveBeenCalled(); }); + + test('should throw an error in case of cluster_block_exception', async () => { + clusterClient.bulk.mockResponseOnce({ + errors: true, + took: 201, + items: [ + { + index: { + _index: '.internal.alerts-default.alerts-default-000001', + _id: '933de4e7-6f99-4df9-b66d-d34b7670d471', + status: 403, + error: { + type: 'cluster_block_exception', + reason: + 'index [.internal.alerts-default.alerts-default-000001] blocked by: [FORBIDDEN/8/index write (api)];', + }, + }, + }, + ], + }); + + const alertsClient = new AlertsClient<{}, {}, {}, 'default', 'recovered'>( + alertsClientParams + ); + + await alertsClient.initializeExecution(defaultExecutionOpts); + + const alertExecutorService = alertsClient.factory(); + alertExecutorService.create('1').scheduleActions('default'); + + await alertsClient.processAlerts(processAlertsOpts); + alertsClient.logAlerts(logAlertsOpts); + + await expect(alertsClient.persistAlerts()).rejects.toThrowError( + 'index [.internal.alerts-default.alerts-default-000001] blocked by: [FORBIDDEN/8/index write (api)];' + ); + }); }); describe('getSummarizedAlerts', () => { diff --git a/x-pack/plugins/alerting/server/alerts_client/alerts_client.ts b/x-pack/plugins/alerting/server/alerts_client/alerts_client.ts index 0c2340ba7cd2d..d62f579e4566e 100644 --- a/x-pack/plugins/alerting/server/alerts_client/alerts_client.ts +++ b/x-pack/plugins/alerting/server/alerts_client/alerts_client.ts @@ -6,6 +6,7 @@ */ import { ElasticsearchClient } from '@kbn/core/server'; + import { ALERT_INSTANCE_ID, ALERT_RULE_UUID, @@ -18,6 +19,8 @@ import { SearchRequest } from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import type { Alert } from '@kbn/alerts-as-data-utils'; import { DEFAULT_NAMESPACE_STRING } from '@kbn/core-saved-objects-utils-server'; import { DeepPartial } from '@kbn/utility-types'; +import { BulkResponse } from '@elastic/elasticsearch/lib/api/types'; +import { CLUSTER_BLOCK_EXCEPTION, isClusterBlockError } from '../lib/error_with_type'; import { UntypedNormalizedRuleType } from '../rule_type_registry'; import { SummarizedAlerts, @@ -65,6 +68,7 @@ import { filterMaintenanceWindows, filterMaintenanceWindowsIds, } from '../task_runner/maintenance_windows'; +import { ErrorWithType } from '../lib/error_with_type'; // Term queries can take up to 10,000 terms const CHUNK_SIZE = 10000; @@ -80,6 +84,7 @@ interface AlertsAffectedByMaintenanceWindows { alertIds: string[]; maintenanceWindowIds: string[]; } + export class AlertsClient< AlertData extends RuleAlertData, LegacyState extends AlertInstanceState, @@ -568,6 +573,8 @@ export class AlertsClient< // If there were individual indexing errors, they will be returned in the success response if (response && response.errors) { + this.throwIfHasClusterBlockException(response); + await resolveAlertConflicts({ logger: this.options.logger, esClient, @@ -584,6 +591,9 @@ export class AlertsClient< }); } } catch (err) { + if (isClusterBlockError(err)) { + throw err; + } this.options.logger.error( `Error writing ${alertsToIndex.length} alerts to ${this.indexTemplateAndPattern.alias} ${this.ruleInfoMessage} - ${err.message}`, this.logTags @@ -813,4 +823,17 @@ export class AlertsClient< public isUsingDataStreams(): boolean { return this._isUsingDataStreams; } + + private throwIfHasClusterBlockException(response: BulkResponse) { + response.items.forEach((item) => { + const op = item.create || item.index || item.update || item.delete; + if (op?.error && op.error.type === CLUSTER_BLOCK_EXCEPTION) { + throw new ErrorWithType({ + message: op.error.reason || 'Unknown reason', + type: CLUSTER_BLOCK_EXCEPTION, + stack: op.error.stack_trace, + }); + } + }); + } } diff --git a/x-pack/plugins/alerting/server/lib/error_with_type.ts b/x-pack/plugins/alerting/server/lib/error_with_type.ts new file mode 100644 index 0000000000000..9fe3e4e4db80e --- /dev/null +++ b/x-pack/plugins/alerting/server/lib/error_with_type.ts @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +export const CLUSTER_BLOCK_EXCEPTION = 'cluster_block_exception'; + +export class ErrorWithType extends Error { + public readonly type: string; + + constructor({ + type, + message = 'Unknown error', + stack, + }: { + type: string; + message?: string; + stack?: string; + }) { + super(message); + this.type = type; + this.stack = stack; + } +} + +export function getErrorType(error: Error): string | undefined { + if (isErrorWithType(error)) { + return error.type; + } +} + +export function isErrorWithType(error: Error | ErrorWithType): error is ErrorWithType { + return error instanceof ErrorWithType; +} + +export function isClusterBlockError(err: Error) { + return getErrorType(err) === CLUSTER_BLOCK_EXCEPTION; +} diff --git a/x-pack/plugins/alerting/server/task_runner/task_runner.test.ts b/x-pack/plugins/alerting/server/task_runner/task_runner.test.ts index 4fc02d24fff47..f65a1b57b42d3 100644 --- a/x-pack/plugins/alerting/server/task_runner/task_runner.test.ts +++ b/x-pack/plugins/alerting/server/task_runner/task_runner.test.ts @@ -97,6 +97,7 @@ import * as getExecutorServicesModule from './get_executor_services'; import { rulesSettingsServiceMock } from '../rules_settings/rules_settings_service.mock'; import { maintenanceWindowsServiceMock } from './maintenance_windows/maintenance_windows_service.mock'; import { MaintenanceWindow } from '../application/maintenance_window/types'; +import { ErrorWithType } from '../lib/error_with_type'; jest.mock('uuid', () => ({ v4: () => '5f6aa57d-3e22-484e-bae8-cbed868f4d28', @@ -3221,6 +3222,39 @@ describe('Task Runner', () => { expect(getErrorSource(runnerResult.taskRunError as Error)).toBe(TaskErrorSource.USER); }); + test('reschedules when persistAlerts returns a cluster_block_exception', async () => { + const err = new ErrorWithType({ + message: 'Index is blocked', + type: 'cluster_block_exception', + }); + + alertsClient.persistAlerts.mockRejectedValueOnce(err); + alertsService.createAlertsClient.mockImplementation(() => alertsClient); + + const taskRunner = new TaskRunner({ + ruleType, + taskInstance: mockedTaskInstance, + context: taskRunnerFactoryInitializerParams, + inMemoryMetrics, + internalSavedObjectsRepository, + }); + mockGetAlertFromRaw.mockReturnValue(mockedRuleTypeSavedObject as Rule); + encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce(mockedRawRuleSO); + + const runnerResult = await taskRunner.run(); + + expect(getErrorSource(runnerResult.taskRunError as Error)).toBe(TaskErrorSource.FRAMEWORK); + expect(runnerResult.state).toEqual(mockedTaskInstance.state); + expect(runnerResult.schedule!.interval).toEqual('1m'); + expect(runnerResult.taskRunError).toMatchInlineSnapshot('[Error: Index is blocked]'); + expect(logger.debug).toHaveBeenCalledWith( + 'Executing Rule default:test:1 has resulted in Error: Index is blocked', + { + tags: ['1', 'test', 'rule-run-failed', 'framework-error'], + } + ); + }); + function testAlertingEventLogCalls({ ruleContext = alertingEventLoggerInitializer, ruleTypeDef = ruleType, diff --git a/x-pack/plugins/alerting/server/task_runner/task_runner.ts b/x-pack/plugins/alerting/server/task_runner/task_runner.ts index cd351054f9937..425754b24b90e 100644 --- a/x-pack/plugins/alerting/server/task_runner/task_runner.ts +++ b/x-pack/plugins/alerting/server/task_runner/task_runner.ts @@ -72,9 +72,11 @@ import { processRunResults, clearExpiredSnoozes, } from './lib'; +import { isClusterBlockError } from '../lib/error_with_type'; const FALLBACK_RETRY_INTERVAL = '5m'; const CONNECTIVITY_RETRY_INTERVAL = '5m'; +const CLUSTER_BLOCKED_EXCEPTION_RETRY_INTERVAL = '1m'; interface TaskRunnerConstructorParams< Params extends RuleTypeParams, @@ -717,7 +719,7 @@ export class TaskRunner< const errorSource = isUserError(err) ? TaskErrorSource.USER : TaskErrorSource.FRAMEWORK; const errorSourceTag = `${errorSource}-error`; - if (isAlertSavedObjectNotFoundError(err, ruleId)) { + if (isAlertSavedObjectNotFoundError(err, ruleId) || isClusterBlockError(err)) { const message = `Executing Rule ${spaceId}:${ this.ruleType.id }:${ruleId} has resulted in Error: ${getEsErrorMessage(err)}`; @@ -757,6 +759,10 @@ export class TaskRunner< : retryInterval; } + if (isClusterBlockError(error)) { + retryInterval = CLUSTER_BLOCKED_EXCEPTION_RETRY_INTERVAL; + } + return { interval: retryInterval }; }), monitoring: this.ruleMonitoring.getMonitoring(),