From 37034dea2c21b4e29b14e8caba1f9b8a4a0d5930 Mon Sep 17 00:00:00 2001 From: Patrick Mueller Date: Mon, 28 Sep 2020 18:19:23 -0400 Subject: [PATCH] [Alerting] retry internal OCC calls within alertsClient (#77838) (#78688) During development of https://github.com/elastic/kibana/pull/75553, some issues came up with the optimistic concurrency control (OCC) we were using internally within the alertsClient, via the `version` option/property of the saved object. The referenced PR updates new fields in the alert from the taskManager task after the alertType executor runs. In some alertsClient methods, OCC is used to update the alert which are requested via user requests. And so in some cases, version conflict errors were coming up when the alert was updated by task manager, in the middle of one of these methods. Note: the SIEM function test cases stress test this REALLY well. In this PR, we wrap all the methods using OCC with a function that will retry them, a short number of times, with a short delay in between. If the original method STILL has a conflict error, it will get thrown after the retry limit. In practice, this eliminated the version conflict calls that were occurring with the SIEM tests, once we started updating the saved object in the executor. For cases where we know only attributes not contributing to AAD are being updated, a new function is provided that does a partial update on just those attributes, making partial updates for those attributes a bit safer. That will be also used by PR #75553. --- .../alerts/server/alerts_client.test.ts | 35 +- x-pack/plugins/alerts/server/alerts_client.ts | 126 ++++-- .../alerts_client_conflict_retries.test.ts | 359 ++++++++++++++++++ .../server/lib/retry_if_conflicts.test.ts | 78 ++++ .../alerts/server/lib/retry_if_conflicts.ts | 58 +++ .../alerts/server/saved_objects/index.ts | 17 + .../partially_update_alert.test.ts | 112 ++++++ .../saved_objects/partially_update_alert.ts | 49 +++ 8 files changed, 801 insertions(+), 33 deletions(-) create mode 100644 x-pack/plugins/alerts/server/alerts_client_conflict_retries.test.ts create mode 100644 x-pack/plugins/alerts/server/lib/retry_if_conflicts.test.ts create mode 100644 x-pack/plugins/alerts/server/lib/retry_if_conflicts.ts create mode 100644 x-pack/plugins/alerts/server/saved_objects/partially_update_alert.test.ts create mode 100644 x-pack/plugins/alerts/server/saved_objects/partially_update_alert.ts diff --git a/x-pack/plugins/alerts/server/alerts_client.test.ts b/x-pack/plugins/alerts/server/alerts_client.test.ts index a6cffb0284815..d4817eab64acb 100644 --- a/x-pack/plugins/alerts/server/alerts_client.test.ts +++ b/x-pack/plugins/alerts/server/alerts_client.test.ts @@ -1696,14 +1696,22 @@ describe('muteAll()', () => { muteAll: false, }, references: [], + version: '123', }); await alertsClient.muteAll({ id: '1' }); - expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledWith('alert', '1', { - muteAll: true, - mutedInstanceIds: [], - updatedBy: 'elastic', - }); + expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledWith( + 'alert', + '1', + { + muteAll: true, + mutedInstanceIds: [], + updatedBy: 'elastic', + }, + { + version: '123', + } + ); }); describe('authorization', () => { @@ -1785,11 +1793,18 @@ describe('unmuteAll()', () => { }); await alertsClient.unmuteAll({ id: '1' }); - expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledWith('alert', '1', { - muteAll: false, - mutedInstanceIds: [], - updatedBy: 'elastic', - }); + expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledWith( + 'alert', + '1', + { + muteAll: false, + mutedInstanceIds: [], + updatedBy: 'elastic', + }, + { + version: '123', + } + ); }); describe('authorization', () => { diff --git a/x-pack/plugins/alerts/server/alerts_client.ts b/x-pack/plugins/alerts/server/alerts_client.ts index 671b1d6411d7f..033fdd752c695 100644 --- a/x-pack/plugins/alerts/server/alerts_client.ts +++ b/x-pack/plugins/alerts/server/alerts_client.ts @@ -45,6 +45,8 @@ import { parseIsoOrRelativeDate } from './lib/iso_or_relative_date'; import { alertInstanceSummaryFromEventLog } from './lib/alert_instance_summary_from_event_log'; import { IEvent } from '../../event_log/server'; import { parseDuration } from '../common/parse_duration'; +import { retryIfConflicts } from './lib/retry_if_conflicts'; +import { partiallyUpdateAlert } from './saved_objects'; export interface RegistryAlertTypeWithAuth extends RegistryAlertType { authorizedConsumers: string[]; @@ -421,6 +423,14 @@ export class AlertsClient { } public async update({ id, data }: UpdateOptions): Promise { + return await retryIfConflicts( + this.logger, + `alertsClient.update('${id}')`, + async () => await this.updateWithOCC({ id, data }) + ); + } + + private async updateWithOCC({ id, data }: UpdateOptions): Promise { let alertSavedObject: SavedObject; try { @@ -529,7 +539,15 @@ export class AlertsClient { }; } - public async updateApiKey({ id }: { id: string }) { + public async updateApiKey({ id }: { id: string }): Promise { + return await retryIfConflicts( + this.logger, + `alertsClient.updateApiKey('${id}')`, + async () => await this.updateApiKeyWithOCC({ id }) + ); + } + + private async updateApiKeyWithOCC({ id }: { id: string }) { let apiKeyToInvalidate: string | null = null; let attributes: RawAlert; let version: string | undefined; @@ -597,7 +615,15 @@ export class AlertsClient { } } - public async enable({ id }: { id: string }) { + public async enable({ id }: { id: string }): Promise { + return await retryIfConflicts( + this.logger, + `alertsClient.enable('${id}')`, + async () => await this.enableWithOCC({ id }) + ); + } + + private async enableWithOCC({ id }: { id: string }) { let apiKeyToInvalidate: string | null = null; let attributes: RawAlert; let version: string | undefined; @@ -658,7 +684,15 @@ export class AlertsClient { } } - public async disable({ id }: { id: string }) { + public async disable({ id }: { id: string }): Promise { + return await retryIfConflicts( + this.logger, + `alertsClient.disable('${id}')`, + async () => await this.disableWithOCC({ id }) + ); + } + + private async disableWithOCC({ id }: { id: string }) { let apiKeyToInvalidate: string | null = null; let attributes: RawAlert; let version: string | undefined; @@ -711,8 +745,19 @@ export class AlertsClient { } } - public async muteAll({ id }: { id: string }) { - const { attributes } = await this.unsecuredSavedObjectsClient.get('alert', id); + public async muteAll({ id }: { id: string }): Promise { + return await retryIfConflicts( + this.logger, + `alertsClient.muteAll('${id}')`, + async () => await this.muteAllWithOCC({ id }) + ); + } + + private async muteAllWithOCC({ id }: { id: string }) { + const { attributes, version } = await this.unsecuredSavedObjectsClient.get( + 'alert', + id + ); await this.authorization.ensureAuthorized( attributes.alertTypeId, attributes.consumer, @@ -723,19 +768,34 @@ export class AlertsClient { await this.actionsAuthorization.ensureAuthorized('execute'); } - await this.unsecuredSavedObjectsClient.update( - 'alert', + const updateAttributes = this.updateMeta({ + muteAll: true, + mutedInstanceIds: [], + updatedBy: await this.getUserName(), + }); + const updateOptions = { version }; + + await partiallyUpdateAlert( + this.unsecuredSavedObjectsClient, id, - this.updateMeta({ - muteAll: true, - mutedInstanceIds: [], - updatedBy: await this.getUserName(), - }) + updateAttributes, + updateOptions + ); + } + + public async unmuteAll({ id }: { id: string }): Promise { + return await retryIfConflicts( + this.logger, + `alertsClient.unmuteAll('${id}')`, + async () => await this.unmuteAllWithOCC({ id }) ); } - public async unmuteAll({ id }: { id: string }) { - const { attributes } = await this.unsecuredSavedObjectsClient.get('alert', id); + private async unmuteAllWithOCC({ id }: { id: string }) { + const { attributes, version } = await this.unsecuredSavedObjectsClient.get( + 'alert', + id + ); await this.authorization.ensureAuthorized( attributes.alertTypeId, attributes.consumer, @@ -746,18 +806,30 @@ export class AlertsClient { await this.actionsAuthorization.ensureAuthorized('execute'); } - await this.unsecuredSavedObjectsClient.update( - 'alert', + const updateAttributes = this.updateMeta({ + muteAll: false, + mutedInstanceIds: [], + updatedBy: await this.getUserName(), + }); + const updateOptions = { version }; + + await partiallyUpdateAlert( + this.unsecuredSavedObjectsClient, id, - this.updateMeta({ - muteAll: false, - mutedInstanceIds: [], - updatedBy: await this.getUserName(), - }) + updateAttributes, + updateOptions + ); + } + + public async muteInstance({ alertId, alertInstanceId }: MuteOptions): Promise { + return await retryIfConflicts( + this.logger, + `alertsClient.muteInstance('${alertId}')`, + async () => await this.muteInstanceWithOCC({ alertId, alertInstanceId }) ); } - public async muteInstance({ alertId, alertInstanceId }: MuteOptions) { + private async muteInstanceWithOCC({ alertId, alertInstanceId }: MuteOptions) { const { attributes, version } = await this.unsecuredSavedObjectsClient.get( 'alert', alertId @@ -788,7 +860,15 @@ export class AlertsClient { } } - public async unmuteInstance({ + public async unmuteInstance({ alertId, alertInstanceId }: MuteOptions): Promise { + return await retryIfConflicts( + this.logger, + `alertsClient.unmuteInstance('${alertId}')`, + async () => await this.unmuteInstanceWithOCC({ alertId, alertInstanceId }) + ); + } + + private async unmuteInstanceWithOCC({ alertId, alertInstanceId, }: { diff --git a/x-pack/plugins/alerts/server/alerts_client_conflict_retries.test.ts b/x-pack/plugins/alerts/server/alerts_client_conflict_retries.test.ts new file mode 100644 index 0000000000000..1c5edb45c80fe --- /dev/null +++ b/x-pack/plugins/alerts/server/alerts_client_conflict_retries.test.ts @@ -0,0 +1,359 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { cloneDeep } from 'lodash'; + +import { AlertsClient, ConstructorOptions } from './alerts_client'; +import { savedObjectsClientMock, loggingSystemMock } from '../../../../src/core/server/mocks'; +import { taskManagerMock } from '../../task_manager/server/task_manager.mock'; +import { alertTypeRegistryMock } from './alert_type_registry.mock'; +import { alertsAuthorizationMock } from './authorization/alerts_authorization.mock'; +import { encryptedSavedObjectsMock } from '../../encrypted_saved_objects/server/mocks'; +import { actionsClientMock, actionsAuthorizationMock } from '../../actions/server/mocks'; +import { AlertsAuthorization } from './authorization/alerts_authorization'; +import { ActionsAuthorization } from '../../actions/server'; +import { SavedObjectsErrorHelpers } from '../../../../src/core/server'; +import { RetryForConflictsAttempts } from './lib/retry_if_conflicts'; +import { TaskStatus } from '../../../plugins/task_manager/server/task'; + +let alertsClient: AlertsClient; + +const MockAlertId = 'alert-id'; + +const ConflictAfterRetries = RetryForConflictsAttempts + 1; + +const taskManager = taskManagerMock.start(); +const alertTypeRegistry = alertTypeRegistryMock.create(); +const unsecuredSavedObjectsClient = savedObjectsClientMock.create(); + +const encryptedSavedObjects = encryptedSavedObjectsMock.createClient(); +const authorization = alertsAuthorizationMock.create(); +const actionsAuthorization = actionsAuthorizationMock.create(); + +const kibanaVersion = 'v7.10.0'; +const logger = loggingSystemMock.create().get(); +const alertsClientParams: jest.Mocked = { + taskManager, + alertTypeRegistry, + unsecuredSavedObjectsClient, + authorization: (authorization as unknown) as AlertsAuthorization, + actionsAuthorization: (actionsAuthorization as unknown) as ActionsAuthorization, + spaceId: 'default', + namespace: 'default', + getUserName: jest.fn(), + createAPIKey: jest.fn(), + invalidateAPIKey: jest.fn(), + logger, + encryptedSavedObjectsClient: encryptedSavedObjects, + getActionsClient: jest.fn(), + getEventLogClient: jest.fn(), + kibanaVersion, +}; + +// this suite consists of two suites running tests against mutable alertsClient APIs: +// - one to run tests where an SO update conflicts once +// - one to run tests where an SO update conflicts too many times +describe('alerts_client_conflict_retries', () => { + // tests that mutable operations work if only one SO conflict occurs + describe(`1 retry works for method`, () => { + beforeEach(() => { + mockSavedObjectUpdateConflictErrorTimes(1); + }); + + testFn(update, true); + testFn(updateApiKey, true); + testFn(enable, true); + testFn(disable, true); + testFn(muteAll, true); + testFn(unmuteAll, true); + testFn(muteInstance, true); + testFn(unmuteInstance, true); + }); + + // tests that mutable operations fail if too many SO conflicts occurs + describe(`${ConflictAfterRetries} retries fails with conflict error`, () => { + beforeEach(() => { + mockSavedObjectUpdateConflictErrorTimes(ConflictAfterRetries); + }); + + testFn(update, false); + testFn(updateApiKey, false); + testFn(enable, false); + testFn(disable, false); + testFn(muteAll, false); + testFn(unmuteAll, false); + testFn(muteInstance, false); + testFn(unmuteInstance, false); + }); +}); + +// alertsClients methods being tested +// - success is passed as an indication if the alertsClient method +// is expected to succeed or not, based on the number of conflicts +// set up in the `beforeEach()` method + +async function update(success: boolean) { + try { + await alertsClient.update({ + id: MockAlertId, + data: { + schedule: { interval: '5s' }, + name: 'cba', + tags: ['bar'], + params: { bar: true }, + throttle: '10s', + actions: [], + }, + }); + } catch (err) { + // only checking the warn messages in this test + expect(logger.warn).lastCalledWith( + `alertsClient.update('alert-id') conflict, exceeded retries` + ); + return expectConflict(success, err, 'create'); + } + expectSuccess(success, 2, 'create'); + + // only checking the debug messages in this test + expect(logger.debug).nthCalledWith(1, `alertsClient.update('alert-id') conflict, retrying ...`); +} + +async function updateApiKey(success: boolean) { + try { + await alertsClient.updateApiKey({ id: MockAlertId }); + } catch (err) { + return expectConflict(success, err); + } + + expectSuccess(success); +} + +async function enable(success: boolean) { + setupRawAlertMocks({}, { enabled: false }); + + try { + await alertsClient.enable({ id: MockAlertId }); + } catch (err) { + return expectConflict(success, err); + } + + // a successful enable call makes 2 calls to update, so that's 3 total, + // 1 with conflict + 2 on success + expectSuccess(success, 3); +} + +async function disable(success: boolean) { + try { + await alertsClient.disable({ id: MockAlertId }); + } catch (err) { + return expectConflict(success, err); + } + + expectSuccess(success); +} + +async function muteAll(success: boolean) { + try { + await alertsClient.muteAll({ id: MockAlertId }); + } catch (err) { + return expectConflict(success, err); + } + + expectSuccess(success); +} + +async function unmuteAll(success: boolean) { + try { + await alertsClient.unmuteAll({ id: MockAlertId }); + } catch (err) { + return expectConflict(success, err); + } + + expectSuccess(success); +} + +async function muteInstance(success: boolean) { + try { + await alertsClient.muteInstance({ alertId: MockAlertId, alertInstanceId: 'instance-id' }); + } catch (err) { + return expectConflict(success, err); + } + + expectSuccess(success); +} + +async function unmuteInstance(success: boolean) { + setupRawAlertMocks({}, { mutedInstanceIds: ['instance-id'] }); + try { + await alertsClient.unmuteInstance({ alertId: MockAlertId, alertInstanceId: 'instance-id' }); + } catch (err) { + return expectConflict(success, err); + } + + expectSuccess(success); +} + +// tests to run when the method is expected to succeed +function expectSuccess( + success: boolean, + count: number = 2, + method: 'update' | 'create' = 'update' +) { + expect(success).toBe(true); + expect(unsecuredSavedObjectsClient[method]).toHaveBeenCalledTimes(count); + // message content checked in the update test + expect(logger.debug).toHaveBeenCalled(); +} + +// tests to run when the method is expected to fail +function expectConflict(success: boolean, err: Error, method: 'update' | 'create' = 'update') { + const conflictErrorMessage = SavedObjectsErrorHelpers.createConflictError('alert', MockAlertId) + .message; + + expect(`${err}`).toBe(`Error: ${conflictErrorMessage}`); + expect(success).toBe(false); + expect(unsecuredSavedObjectsClient[method]).toHaveBeenCalledTimes(ConflictAfterRetries); + // message content checked in the update test + expect(logger.debug).toBeCalledTimes(RetryForConflictsAttempts); + expect(logger.warn).toBeCalledTimes(1); +} + +// wrapper to call the test function with a it's own name +function testFn(fn: (success: boolean) => unknown, success: boolean) { + test(`${fn.name}`, async () => await fn(success)); +} + +// set up mocks for update or create (the update() method uses create!) +function mockSavedObjectUpdateConflictErrorTimes(times: number) { + // default success value + const mockUpdateValue = { + id: MockAlertId, + type: 'alert', + attributes: { + actions: [], + scheduledTaskId: 'scheduled-task-id', + }, + references: [], + }; + + unsecuredSavedObjectsClient.update.mockResolvedValue(mockUpdateValue); + unsecuredSavedObjectsClient.create.mockResolvedValue(mockUpdateValue); + + // queue up specified number of errors before a success call + for (let i = 0; i < times; i++) { + unsecuredSavedObjectsClient.update.mockRejectedValueOnce( + SavedObjectsErrorHelpers.createConflictError('alert', MockAlertId) + ); + unsecuredSavedObjectsClient.create.mockRejectedValueOnce( + SavedObjectsErrorHelpers.createConflictError('alert', MockAlertId) + ); + } +} + +// set up mocks needed to get the tested methods to run +function setupRawAlertMocks( + overrides: Record = {}, + attributeOverrides: Record = {} +) { + const rawAlert = { + id: MockAlertId, + type: 'alert', + attributes: { + enabled: true, + tags: ['foo'], + alertTypeId: 'myType', + schedule: { interval: '10s' }, + consumer: 'myApp', + scheduledTaskId: 'task-123', + params: {}, + throttle: null, + actions: [], + muteAll: false, + mutedInstanceIds: [], + ...attributeOverrides, + }, + references: [], + version: '123', + ...overrides, + }; + const decryptedRawAlert = { + ...rawAlert, + attributes: { + ...rawAlert.attributes, + apiKey: Buffer.from('123:abc').toString('base64'), + }, + }; + + unsecuredSavedObjectsClient.get.mockReset(); + encryptedSavedObjects.getDecryptedAsInternalUser.mockReset(); + + // splitting this out as it's easier to set a breakpoint :-) + // eslint-disable-next-line prettier/prettier + unsecuredSavedObjectsClient.get.mockImplementation(async () => + cloneDeep(rawAlert) + ); + + encryptedSavedObjects.getDecryptedAsInternalUser.mockImplementation(async () => + cloneDeep(decryptedRawAlert) + ); +} + +// setup for each test +beforeEach(() => { + jest.resetAllMocks(); + + alertsClientParams.createAPIKey.mockResolvedValue({ apiKeysEnabled: false }); + alertsClientParams.invalidateAPIKey.mockResolvedValue({ + apiKeysEnabled: true, + result: { + invalidated_api_keys: [], + previously_invalidated_api_keys: [], + error_count: 0, + }, + }); + alertsClientParams.getUserName.mockResolvedValue('elastic'); + + taskManager.runNow.mockResolvedValue({ id: '' }); + taskManager.schedule.mockResolvedValue({ + id: 'scheduled-task-id', + scheduledAt: new Date(), + attempts: 0, + status: TaskStatus.Idle, + runAt: new Date(), + startedAt: null, + retryAt: null, + state: {}, + ownerId: null, + taskType: 'task-type', + params: {}, + }); + + const actionsClient = actionsClientMock.create(); + actionsClient.getBulk.mockResolvedValue([]); + alertsClientParams.getActionsClient.mockResolvedValue(actionsClient); + + alertTypeRegistry.get.mockImplementation((id) => ({ + id: '123', + name: 'Test', + actionGroups: [{ id: 'default', name: 'Default' }], + defaultActionGroupId: 'default', + async executor() {}, + producer: 'alerts', + })); + + alertTypeRegistry.get.mockReturnValue({ + id: 'myType', + name: 'Test', + actionGroups: [{ id: 'default', name: 'Default' }], + defaultActionGroupId: 'default', + async executor() {}, + producer: 'alerts', + }); + + alertsClient = new AlertsClient(alertsClientParams); + + setupRawAlertMocks(); +}); diff --git a/x-pack/plugins/alerts/server/lib/retry_if_conflicts.test.ts b/x-pack/plugins/alerts/server/lib/retry_if_conflicts.test.ts new file mode 100644 index 0000000000000..19caccc753e38 --- /dev/null +++ b/x-pack/plugins/alerts/server/lib/retry_if_conflicts.test.ts @@ -0,0 +1,78 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { SavedObjectsErrorHelpers } from '../../../../../src/core/server'; +import { retryIfConflicts, RetryForConflictsAttempts } from './retry_if_conflicts'; +import { loggingSystemMock } from '../../../../../src/core/server/mocks'; + +describe('retry_if_conflicts', () => { + beforeEach(() => { + jest.resetAllMocks(); + }); + + test('should work when operation is a success', async () => { + const result = await retryIfConflicts(MockLogger, MockOperationName, OperationSuccessful); + expect(result).toBe(MockResult); + }); + + test('should throw error if not a conflict error', async () => { + await expect( + retryIfConflicts(MockLogger, MockOperationName, OperationFailure) + ).rejects.toThrowError('wops'); + }); + + for (let i = 1; i <= RetryForConflictsAttempts; i++) { + test(`should work when operation conflicts ${i} times`, async () => { + const result = await retryIfConflicts( + MockLogger, + MockOperationName, + getOperationConflictsTimes(i) + ); + expect(result).toBe(MockResult); + expect(MockLogger.debug).toBeCalledTimes(i); + for (let j = 0; j < i; j++) { + expect(MockLogger.debug).nthCalledWith(i, `${MockOperationName} conflict, retrying ...`); + } + }); + } + + test(`should throw conflict error when conflicts > ${RetryForConflictsAttempts} times`, async () => { + await expect( + retryIfConflicts( + MockLogger, + MockOperationName, + getOperationConflictsTimes(RetryForConflictsAttempts + 1) + ) + ).rejects.toThrowError(SavedObjectsErrorHelpers.createConflictError('alert', MockAlertId)); + expect(MockLogger.debug).toBeCalledTimes(RetryForConflictsAttempts); + expect(MockLogger.warn).toBeCalledTimes(1); + expect(MockLogger.warn).toBeCalledWith(`${MockOperationName} conflict, exceeded retries`); + }); +}); + +const MockAlertId = 'alert-id'; +const MockOperationName = 'conflict-retryable-operation'; +const MockLogger = loggingSystemMock.create().get(); +const MockResult = 42; + +async function OperationSuccessful() { + return MockResult; +} + +async function OperationFailure() { + throw new Error('wops'); +} + +function getOperationConflictsTimes(times: number) { + return async function OperationConflictsTimes() { + times--; + if (times >= 0) { + throw SavedObjectsErrorHelpers.createConflictError('alert', MockAlertId); + } + + return MockResult; + }; +} diff --git a/x-pack/plugins/alerts/server/lib/retry_if_conflicts.ts b/x-pack/plugins/alerts/server/lib/retry_if_conflicts.ts new file mode 100644 index 0000000000000..9cb1d7975855c --- /dev/null +++ b/x-pack/plugins/alerts/server/lib/retry_if_conflicts.ts @@ -0,0 +1,58 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +// This module provides a helper to perform retries on a function if the +// function ends up throwing a SavedObject 409 conflict. This can happen +// when alert SO's are updated in the background, and will avoid having to +// have the caller make explicit conflict checks, where the conflict was +// caused by a background update. + +import { Logger, SavedObjectsErrorHelpers } from '../../../../../src/core/server'; + +type RetryableForConflicts = () => Promise; + +// number of times to retry when conflicts occur +// note: it seems unlikely that we'd need more than one retry, but leaving +// this statically configurable in case we DO need > 1 +export const RetryForConflictsAttempts = 1; + +// milliseconds to wait before retrying when conflicts occur +// note: we considered making this random, to help avoid a stampede, but +// with 1 retry it probably doesn't matter, and adding randomness could +// make it harder to diagnose issues +const RetryForConflictsDelay = 250; + +// retry an operation if it runs into 409 Conflict's, up to a limit +export async function retryIfConflicts( + logger: Logger, + name: string, + operation: RetryableForConflicts, + retries: number = RetryForConflictsAttempts +): Promise { + // run the operation, return if no errors or throw if not a conflict error + try { + return await operation(); + } catch (err) { + if (!SavedObjectsErrorHelpers.isConflictError(err)) { + throw err; + } + + // must be a conflict; if no retries left, throw it + if (retries <= 0) { + logger.warn(`${name} conflict, exceeded retries`); + throw err; + } + + // delay a bit before retrying + logger.debug(`${name} conflict, retrying ...`); + await waitBeforeNextRetry(); + return await retryIfConflicts(logger, name, operation, retries - 1); + } +} + +async function waitBeforeNextRetry(): Promise { + await new Promise((resolve) => setTimeout(resolve, RetryForConflictsDelay)); +} diff --git a/x-pack/plugins/alerts/server/saved_objects/index.ts b/x-pack/plugins/alerts/server/saved_objects/index.ts index 06ce8d673e6b7..51ac68b589977 100644 --- a/x-pack/plugins/alerts/server/saved_objects/index.ts +++ b/x-pack/plugins/alerts/server/saved_objects/index.ts @@ -9,6 +9,23 @@ import mappings from './mappings.json'; import { getMigrations } from './migrations'; import { EncryptedSavedObjectsPluginSetup } from '../../../encrypted_saved_objects/server'; +export { partiallyUpdateAlert } from './partially_update_alert'; + +export const AlertAttributesExcludedFromAAD = [ + 'scheduledTaskId', + 'muteAll', + 'mutedInstanceIds', + 'updatedBy', +]; + +// useful for Pick which is a +// type which is a subset of RawAlert with just attributes excluded from AAD +export type AlertAttributesExcludedFromAADType = + | 'scheduledTaskId' + | 'muteAll' + | 'mutedInstanceIds' + | 'updatedBy'; + export function setupSavedObjects( savedObjects: SavedObjectsServiceSetup, encryptedSavedObjects: EncryptedSavedObjectsPluginSetup diff --git a/x-pack/plugins/alerts/server/saved_objects/partially_update_alert.test.ts b/x-pack/plugins/alerts/server/saved_objects/partially_update_alert.test.ts new file mode 100644 index 0000000000000..50815c797e399 --- /dev/null +++ b/x-pack/plugins/alerts/server/saved_objects/partially_update_alert.test.ts @@ -0,0 +1,112 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { + SavedObjectsClientContract, + ISavedObjectsRepository, + SavedObjectsErrorHelpers, +} from '../../../../../src/core/server'; + +import { partiallyUpdateAlert, PartiallyUpdateableAlertAttributes } from './partially_update_alert'; +import { savedObjectsClientMock } from '../../../../../src/core/server/mocks'; + +const MockSavedObjectsClientContract = savedObjectsClientMock.create(); +const MockISavedObjectsRepository = (MockSavedObjectsClientContract as unknown) as jest.Mocked< + ISavedObjectsRepository +>; + +describe('partially_update_alert', () => { + beforeEach(() => { + jest.resetAllMocks(); + }); + + for (const [soClientName, soClient] of Object.entries(getMockSavedObjectClients())) + describe(`using ${soClientName}`, () => { + test('should work with no options', async () => { + soClient.update.mockResolvedValueOnce(MockUpdateValue); + + await partiallyUpdateAlert(soClient, MockAlertId, DefaultAttributes); + expect(soClient.update).toHaveBeenCalledWith('alert', MockAlertId, DefaultAttributes, {}); + }); + + test('should work with extraneous attributes ', async () => { + const attributes = (InvalidAttributes as unknown) as PartiallyUpdateableAlertAttributes; + soClient.update.mockResolvedValueOnce(MockUpdateValue); + + await partiallyUpdateAlert(soClient, MockAlertId, attributes); + expect(soClient.update).toHaveBeenCalledWith('alert', MockAlertId, DefaultAttributes, {}); + }); + + test('should handle SO errors', async () => { + soClient.update.mockRejectedValueOnce(new Error('wops')); + + await expect( + partiallyUpdateAlert(soClient, MockAlertId, DefaultAttributes) + ).rejects.toThrowError('wops'); + }); + + test('should handle the version option', async () => { + soClient.update.mockResolvedValueOnce(MockUpdateValue); + + await partiallyUpdateAlert(soClient, MockAlertId, DefaultAttributes, { version: '1.2.3' }); + expect(soClient.update).toHaveBeenCalledWith('alert', MockAlertId, DefaultAttributes, { + version: '1.2.3', + }); + }); + + test('should handle the ignore404 option', async () => { + const err = SavedObjectsErrorHelpers.createGenericNotFoundError(); + soClient.update.mockRejectedValueOnce(err); + + await partiallyUpdateAlert(soClient, MockAlertId, DefaultAttributes, { ignore404: true }); + expect(soClient.update).toHaveBeenCalledWith('alert', MockAlertId, DefaultAttributes, {}); + }); + + test('should handle the namespace option', async () => { + soClient.update.mockResolvedValueOnce(MockUpdateValue); + + await partiallyUpdateAlert(soClient, MockAlertId, DefaultAttributes, { + namespace: 'bat.cave', + }); + expect(soClient.update).toHaveBeenCalledWith('alert', MockAlertId, DefaultAttributes, { + namespace: 'bat.cave', + }); + }); + }); +}); + +function getMockSavedObjectClients(): Record< + string, + jest.Mocked +> { + return { + SavedObjectsClientContract: MockSavedObjectsClientContract, + // doesn't appear to be a mock for this, but it's basically the same as the above, + // so just cast it to make sure we catch any type errors + ISavedObjectsRepository: MockISavedObjectsRepository, + }; +} + +const DefaultAttributes = { + scheduledTaskId: 'scheduled-task-id', + muteAll: true, + mutedInstanceIds: ['muted-instance-id-1', 'muted-instance-id-2'], + updatedBy: 'someone', +}; + +const InvalidAttributes = { ...DefaultAttributes, foo: 'bar' }; + +const MockAlertId = 'alert-id'; + +const MockUpdateValue = { + id: MockAlertId, + type: 'alert', + attributes: { + actions: [], + scheduledTaskId: 'scheduled-task-id', + }, + references: [], +}; diff --git a/x-pack/plugins/alerts/server/saved_objects/partially_update_alert.ts b/x-pack/plugins/alerts/server/saved_objects/partially_update_alert.ts new file mode 100644 index 0000000000000..cc25aaba35798 --- /dev/null +++ b/x-pack/plugins/alerts/server/saved_objects/partially_update_alert.ts @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { pick } from 'lodash'; +import { RawAlert } from '../types'; + +import { + SavedObjectsClient, + SavedObjectsErrorHelpers, + SavedObjectsUpdateOptions, +} from '../../../../../src/core/server'; + +import { AlertAttributesExcludedFromAAD, AlertAttributesExcludedFromAADType } from './index'; + +export type PartiallyUpdateableAlertAttributes = Pick; + +export interface PartiallyUpdateAlertSavedObjectOptions { + version?: string; + ignore404?: boolean; + namespace?: string; // only should be used with ISavedObjectsRepository +} + +// typed this way so we can send a SavedObjectClient or SavedObjectRepository +type SavedObjectClientForUpdate = Pick; + +// direct, partial update to an alert saved object via scoped SavedObjectsClient +// using namespace set in the client +export async function partiallyUpdateAlert( + savedObjectsClient: SavedObjectClientForUpdate, + id: string, + attributes: PartiallyUpdateableAlertAttributes, + options: PartiallyUpdateAlertSavedObjectOptions = {} +): Promise { + // ensure we only have the valid attributes excluded from AAD + const attributeUpdates = pick(attributes, AlertAttributesExcludedFromAAD); + const updateOptions: SavedObjectsUpdateOptions = pick(options, 'namespace', 'version'); + + try { + await savedObjectsClient.update('alert', id, attributeUpdates, updateOptions); + } catch (err) { + if (options?.ignore404 && SavedObjectsErrorHelpers.isNotFoundError(err)) { + return; + } + throw err; + } +}