Skip to content

Commit

Permalink
Reschedule a rule task when there is a cluster_block_exception (elast…
Browse files Browse the repository at this point in the history
…ic#201761)

Resolves: elastic/response-ops-team#249

This PR reschedules a rule (its task behind it) to be executed again in
1m in case of `cluster_block_exception` that will be thrown during a
reindex.

## To verify:

- Run your local Kibana,
- Create a user with `kibana_system` and `kibana_admin` roles
- Logout and login with your new user
- Create an Always firing rule with 5m interval
- Use below request to put a write block on the index used by the Always
firing rule.
   `PUT /.internal.alerts-default.alerts-default-000001/_block/write`
- There shouldn't be any error on the terminal
- Check the task SO of the rule by using the below query, value of the
`task.schedule.interval` should be 1m and the value of the `task.state`
should remain the same.
- Use below request on the Kibana console to remove write block. Every
thing should come back to normal and alerts of the rule should be saved
under `.internal.alerts-default.alerts-default-000001` index.
```
PUT /.kibana_task_manager_9.0.0_001/_settings
{
  "index": {
    "blocks.write": false
  }
}
```

---------

Co-authored-by: kibanamachine <[email protected]>
  • Loading branch information
ersin-erdal and kibanamachine authored Dec 19, 2024
1 parent e4b4101 commit 99b2550
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 1 deletion.
37 changes: 37 additions & 0 deletions x-pack/plugins/alerting/server/alerts_client/alerts_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1675,6 +1675,43 @@ describe('Alerts Client', () => {
expect(clusterClient.bulk).not.toHaveBeenCalled();
expect(maintenanceWindowsService.getMaintenanceWindows).not.toHaveBeenCalled();
});

test('should throw an error in case of cluster_block_exception', async () => {
clusterClient.bulk.mockResponseOnce({
errors: true,
took: 201,
items: [
{
index: {
_index: '.internal.alerts-default.alerts-default-000001',
_id: '933de4e7-6f99-4df9-b66d-d34b7670d471',
status: 403,
error: {
type: 'cluster_block_exception',
reason:
'index [.internal.alerts-default.alerts-default-000001] blocked by: [FORBIDDEN/8/index write (api)];',
},
},
},
],
});

const alertsClient = new AlertsClient<{}, {}, {}, 'default', 'recovered'>(
alertsClientParams
);

await alertsClient.initializeExecution(defaultExecutionOpts);

const alertExecutorService = alertsClient.factory();
alertExecutorService.create('1').scheduleActions('default');

await alertsClient.processAlerts(processAlertsOpts);
alertsClient.logAlerts(logAlertsOpts);

await expect(alertsClient.persistAlerts()).rejects.toThrowError(
'index [.internal.alerts-default.alerts-default-000001] blocked by: [FORBIDDEN/8/index write (api)];'
);
});
});

describe('getSummarizedAlerts', () => {
Expand Down
23 changes: 23 additions & 0 deletions x-pack/plugins/alerting/server/alerts_client/alerts_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

import { ElasticsearchClient } from '@kbn/core/server';

import {
ALERT_INSTANCE_ID,
ALERT_RULE_UUID,
Expand All @@ -18,6 +19,8 @@ import { SearchRequest } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { Alert } from '@kbn/alerts-as-data-utils';
import { DEFAULT_NAMESPACE_STRING } from '@kbn/core-saved-objects-utils-server';
import { DeepPartial } from '@kbn/utility-types';
import { BulkResponse } from '@elastic/elasticsearch/lib/api/types';
import { CLUSTER_BLOCK_EXCEPTION, isClusterBlockError } from '../lib/error_with_type';
import { UntypedNormalizedRuleType } from '../rule_type_registry';
import {
SummarizedAlerts,
Expand Down Expand Up @@ -65,6 +68,7 @@ import {
filterMaintenanceWindows,
filterMaintenanceWindowsIds,
} from '../task_runner/maintenance_windows';
import { ErrorWithType } from '../lib/error_with_type';

// Term queries can take up to 10,000 terms
const CHUNK_SIZE = 10000;
Expand All @@ -80,6 +84,7 @@ interface AlertsAffectedByMaintenanceWindows {
alertIds: string[];
maintenanceWindowIds: string[];
}

export class AlertsClient<
AlertData extends RuleAlertData,
LegacyState extends AlertInstanceState,
Expand Down Expand Up @@ -568,6 +573,8 @@ export class AlertsClient<

// If there were individual indexing errors, they will be returned in the success response
if (response && response.errors) {
this.throwIfHasClusterBlockException(response);

await resolveAlertConflicts({
logger: this.options.logger,
esClient,
Expand All @@ -584,6 +591,9 @@ export class AlertsClient<
});
}
} catch (err) {
if (isClusterBlockError(err)) {
throw err;
}
this.options.logger.error(
`Error writing ${alertsToIndex.length} alerts to ${this.indexTemplateAndPattern.alias} ${this.ruleInfoMessage} - ${err.message}`,
this.logTags
Expand Down Expand Up @@ -813,4 +823,17 @@ export class AlertsClient<
public isUsingDataStreams(): boolean {
return this._isUsingDataStreams;
}

private throwIfHasClusterBlockException(response: BulkResponse) {
response.items.forEach((item) => {
const op = item.create || item.index || item.update || item.delete;
if (op?.error && op.error.type === CLUSTER_BLOCK_EXCEPTION) {
throw new ErrorWithType({
message: op.error.reason || 'Unknown reason',
type: CLUSTER_BLOCK_EXCEPTION,
stack: op.error.stack_trace,
});
}
});
}
}
39 changes: 39 additions & 0 deletions x-pack/plugins/alerting/server/lib/error_with_type.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export const CLUSTER_BLOCK_EXCEPTION = 'cluster_block_exception';

export class ErrorWithType extends Error {
public readonly type: string;

constructor({
type,
message = 'Unknown error',
stack,
}: {
type: string;
message?: string;
stack?: string;
}) {
super(message);
this.type = type;
this.stack = stack;
}
}

export function getErrorType(error: Error): string | undefined {
if (isErrorWithType(error)) {
return error.type;
}
}

export function isErrorWithType(error: Error | ErrorWithType): error is ErrorWithType {
return error instanceof ErrorWithType;
}

export function isClusterBlockError(err: Error) {
return getErrorType(err) === CLUSTER_BLOCK_EXCEPTION;
}
34 changes: 34 additions & 0 deletions x-pack/plugins/alerting/server/task_runner/task_runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ import * as getExecutorServicesModule from './get_executor_services';
import { rulesSettingsServiceMock } from '../rules_settings/rules_settings_service.mock';
import { maintenanceWindowsServiceMock } from './maintenance_windows/maintenance_windows_service.mock';
import { MaintenanceWindow } from '../application/maintenance_window/types';
import { ErrorWithType } from '../lib/error_with_type';

jest.mock('uuid', () => ({
v4: () => '5f6aa57d-3e22-484e-bae8-cbed868f4d28',
Expand Down Expand Up @@ -3221,6 +3222,39 @@ describe('Task Runner', () => {
expect(getErrorSource(runnerResult.taskRunError as Error)).toBe(TaskErrorSource.USER);
});

test('reschedules when persistAlerts returns a cluster_block_exception', async () => {
const err = new ErrorWithType({
message: 'Index is blocked',
type: 'cluster_block_exception',
});

alertsClient.persistAlerts.mockRejectedValueOnce(err);
alertsService.createAlertsClient.mockImplementation(() => alertsClient);

const taskRunner = new TaskRunner({
ruleType,
taskInstance: mockedTaskInstance,
context: taskRunnerFactoryInitializerParams,
inMemoryMetrics,
internalSavedObjectsRepository,
});
mockGetAlertFromRaw.mockReturnValue(mockedRuleTypeSavedObject as Rule);
encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce(mockedRawRuleSO);

const runnerResult = await taskRunner.run();

expect(getErrorSource(runnerResult.taskRunError as Error)).toBe(TaskErrorSource.FRAMEWORK);
expect(runnerResult.state).toEqual(mockedTaskInstance.state);
expect(runnerResult.schedule!.interval).toEqual('1m');
expect(runnerResult.taskRunError).toMatchInlineSnapshot('[Error: Index is blocked]');
expect(logger.debug).toHaveBeenCalledWith(
'Executing Rule default:test:1 has resulted in Error: Index is blocked',
{
tags: ['1', 'test', 'rule-run-failed', 'framework-error'],
}
);
});

function testAlertingEventLogCalls({
ruleContext = alertingEventLoggerInitializer,
ruleTypeDef = ruleType,
Expand Down
8 changes: 7 additions & 1 deletion x-pack/plugins/alerting/server/task_runner/task_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@ import {
processRunResults,
clearExpiredSnoozes,
} from './lib';
import { isClusterBlockError } from '../lib/error_with_type';

const FALLBACK_RETRY_INTERVAL = '5m';
const CONNECTIVITY_RETRY_INTERVAL = '5m';
const CLUSTER_BLOCKED_EXCEPTION_RETRY_INTERVAL = '1m';

interface TaskRunnerConstructorParams<
Params extends RuleTypeParams,
Expand Down Expand Up @@ -717,7 +719,7 @@ export class TaskRunner<
const errorSource = isUserError(err) ? TaskErrorSource.USER : TaskErrorSource.FRAMEWORK;
const errorSourceTag = `${errorSource}-error`;

if (isAlertSavedObjectNotFoundError(err, ruleId)) {
if (isAlertSavedObjectNotFoundError(err, ruleId) || isClusterBlockError(err)) {
const message = `Executing Rule ${spaceId}:${
this.ruleType.id
}:${ruleId} has resulted in Error: ${getEsErrorMessage(err)}`;
Expand Down Expand Up @@ -757,6 +759,10 @@ export class TaskRunner<
: retryInterval;
}

if (isClusterBlockError(error)) {
retryInterval = CLUSTER_BLOCKED_EXCEPTION_RETRY_INTERVAL;
}

return { interval: retryInterval };
}),
monitoring: this.ruleMonitoring.getMonitoring(),
Expand Down

0 comments on commit 99b2550

Please sign in to comment.