Skip to content

Commit

Permalink
Passing function into rule executor to determine whether to write AAD
Browse files Browse the repository at this point in the history
  • Loading branch information
ymao1 committed Oct 11, 2021
1 parent 658ec46 commit 7fac6a2
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 41 deletions.
1 change: 1 addition & 0 deletions x-pack/plugins/alerting/server/mocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ const createAlertServicesMock = <
.mockReturnValue(alertInstanceFactoryMock),
savedObjectsClient: savedObjectsClientMock.create(),
scopedClusterClient: elasticsearchServiceMock.createScopedClusterClient(),
shouldLogAndScheduleActionsForAlerts: () => true,
};
};
export type AlertServicesMock = ReturnType<typeof createAlertServicesMock>;
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/alerting/server/task_runner/task_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ export class TaskRunner<
InstanceContext,
WithoutReservedActionGroups<ActionGroupIds, RecoveryActionGroupId>
>(alertInstances),
shouldLogAndScheduleActionsForAlerts: () => this.shouldLogAndScheduleActionsForAlerts(),
},
params,
state: alertTypeState as State,
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/alerting/server/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ export interface AlertServices<
alertInstanceFactory: (
id: string
) => PublicAlertInstance<InstanceState, InstanceContext, ActionGroupIds>;
shouldLogAndScheduleActionsForAlerts: () => boolean;
}

export interface AlertExecutorOptions<
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/apm/server/lib/alerts/test_utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export const createRuleTypeMocks = () => {
alertInstanceFactory: jest.fn(() => ({ scheduleActions })),
alertWithLifecycle: jest.fn(),
logger: loggerMock,
shouldLogAndScheduleActionsForAlerts: () => true,
};

return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,33 @@ describe('createLifecycleExecutor', () => {
})
);
});

it('does not write alert documents when rule execution is cancelled and feature flags indicate to skip', async () => {
const logger = loggerMock.create();
const ruleDataClientMock = createRuleDataClientMock();
const executor = createLifecycleExecutor(
logger,
ruleDataClientMock
)<{}, TestRuleState, never, never, never>(async (options) => {
expect(options.state).toEqual(initialRuleState);

const nextRuleState: TestRuleState = {
aRuleStateKey: 'NEXT_RULE_STATE_VALUE',
};

return nextRuleState;
});

await executor(
createDefaultAlertExecutorOptions({
params: {},
state: { wrapped: initialRuleState, trackedAlerts: {} },
shouldLogAndScheduleActionsForAlerts: false,
})
);

expect(ruleDataClientMock.getWriter).not.toHaveBeenCalled();
});
});

type TestRuleState = Record<string, unknown> & {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ export const createLifecycleExecutor =
>
): Promise<WrappedLifecycleRuleState<State>> => {
const {
services: { alertInstanceFactory },
services: { alertInstanceFactory, shouldLogAndScheduleActionsForAlerts },
state: previousState,
} = options;

Expand Down Expand Up @@ -281,7 +281,16 @@ export const createLifecycleExecutor =
const newEventsToIndex = makeEventsDataMapFor(newAlertIds);
const allEventsToIndex = [...trackedEventsToIndex, ...newEventsToIndex];

if (allEventsToIndex.length > 0 && ruleDataClient.isWriteEnabled()) {
// Only write alerts if:
// - writing is enabled
// AND
// - rule execution has not been cancelled due to timeout
// OR
// - if execution has been cancelled due to timeout, if feature flags are configured to write alerts anyway
const shouldWriteAlerts =
ruleDataClient.isWriteEnabled() && shouldLogAndScheduleActionsForAlerts();

if (allEventsToIndex.length > 0 && shouldWriteAlerts) {
logger.debug(`Preparing to index ${allEventsToIndex.length} alerts.`);

await ruleDataClient.getWriter().bulk({
Expand All @@ -307,6 +316,6 @@ export const createLifecycleExecutor =

return {
wrapped: nextWrappedState ?? ({} as State),
trackedAlerts: ruleDataClient.isWriteEnabled() ? nextTrackedAlerts : {},
trackedAlerts: shouldWriteAlerts ? nextTrackedAlerts : {},
};
};
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { createLifecycleRuleTypeFactory } from './create_lifecycle_rule_type_fac

type RuleTestHelpers = ReturnType<typeof createRule>;

function createRule() {
function createRule(shouldLogAndScheduleActionsForAlerts: boolean = true) {
const ruleDataClientMock = createRuleDataClientMock();

const factory = createLifecycleRuleTypeFactory({
Expand Down Expand Up @@ -110,6 +110,7 @@ function createRule() {
alertInstanceFactory,
savedObjectsClient: {} as any,
scopedClusterClient: {} as any,
shouldLogAndScheduleActionsForAlerts: () => shouldLogAndScheduleActionsForAlerts,
},
spaceId: 'spaceId',
state,
Expand Down Expand Up @@ -152,6 +153,26 @@ describe('createLifecycleRuleTypeFactory', () => {
});
});

describe('when rule is cancelled due to timeout and config flags indicate to skip actions', () => {
beforeEach(() => {
helpers = createRule(false);
helpers.ruleDataClientMock.isWriteEnabled.mockReturnValue(true);
});

it("doesn't persist anything", async () => {
await helpers.alertWithLifecycle([
{
id: 'opbeans-java',
fields: {
'service.name': 'opbeans-java',
},
},
]);

expect(helpers.ruleDataClientMock.getWriter().bulk).toHaveBeenCalledTimes(0);
});
});

describe('when alerts are new', () => {
beforeEach(async () => {
await helpers.alertWithLifecycle([
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ export const createPersistenceRuleTypeFactory: CreatePersistenceRuleTypeFactory
const numAlerts = alerts.length;
logger.debug(`Found ${numAlerts} alerts.`);

if (ruleDataClient.isWriteEnabled() && numAlerts) {
if (
ruleDataClient.isWriteEnabled() &&
numAlerts &&
options.services.shouldLogAndScheduleActionsForAlerts() // This check ensures that rule execution has not been cancelled due to timeout or, if it has whether we still want to proceed with handling alerts after rule timeout
) {
const commonRuleFields = getCommonAlertFields(options);

const response = await ruleDataClient.getWriter().bulk({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export const createDefaultAlertExecutorOptions = <
createdAt = new Date(),
startedAt = new Date(),
updatedAt = new Date(),
shouldLogAndScheduleActionsForAlerts = true,
}: {
alertId?: string;
ruleName?: string;
Expand All @@ -39,6 +40,7 @@ export const createDefaultAlertExecutorOptions = <
createdAt?: Date;
startedAt?: Date;
updatedAt?: Date;
shouldLogAndScheduleActionsForAlerts?: boolean;
}): AlertExecutorOptions<Params, State, InstanceState, InstanceContext, ActionGroupIds> => ({
alertId,
createdBy: 'CREATED_BY',
Expand Down Expand Up @@ -69,6 +71,7 @@ export const createDefaultAlertExecutorOptions = <
.alertInstanceFactory,
savedObjectsClient: savedObjectsClientMock.create(),
scopedClusterClient: elasticsearchServiceMock.createScopedClusterClient(),
shouldLogAndScheduleActionsForAlerts: () => shouldLogAndScheduleActionsForAlerts,
},
state,
updatedBy: null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export const createRuleTypeMocks = (
findAlerts: jest.fn(), // TODO: does this stay?
alertWithPersistence: jest.fn(),
logger: loggerMock,
shouldLogAndScheduleActionsForAlerts: () => true,
};

return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,53 +61,64 @@ export const bulkCreateFactory =
`individual bulk process time took: ${makeFloatString(end - start)} milliseconds`
)
);
logger.debug(
buildRuleMessage(`took property says bulk took: ${response.body.took} milliseconds`)
);

const createdItems = wrappedDocs
.map((doc, index) => {
const responseIndex = response.body.items[index].index;
return {
_id: responseIndex?._id ?? '',
_index: responseIndex?._index ?? '',
[ALERT_INSTANCE_ID]: responseIndex?._id ?? '',
...doc._source,
};
})
.filter((_, index) => response.body.items[index].index?.status === 201);
const createdItemsCount = createdItems.length;
if (response) {
logger.debug(
buildRuleMessage(`took property says bulk took: ${response.body.took} milliseconds`)
);

const duplicateSignalsCount = countBy(response.body.items, 'create.status')['409'];
const errorCountByMessage = errorAggregator(response.body, [409]);
const createdItems = wrappedDocs
.map((doc, index) => {
const responseIndex = response.body.items[index].index;
return {
_id: responseIndex?._id ?? '',
_index: responseIndex?._index ?? '',
[ALERT_INSTANCE_ID]: responseIndex?._id ?? '',
...doc._source,
};
})
.filter((_, index) => response.body.items[index].index?.status === 201);

logger.debug(buildRuleMessage(`bulk created ${createdItemsCount} signals`));
const createdItemsCount = createdItems.length;
const duplicateSignalsCount = countBy(response.body.items, 'create.status')['409'];
const errorCountByMessage = errorAggregator(response.body, [409]);

if (duplicateSignalsCount > 0) {
logger.debug(buildRuleMessage(`ignored ${duplicateSignalsCount} duplicate signals`));
}
logger.debug(buildRuleMessage(`bulk created ${createdItemsCount} signals`));

if (!isEmpty(errorCountByMessage)) {
logger.error(
buildRuleMessage(
`[-] bulkResponse had errors with responses of: ${JSON.stringify(errorCountByMessage)}`
)
);
if (duplicateSignalsCount > 0) {
logger.debug(buildRuleMessage(`ignored ${duplicateSignalsCount} duplicate signals`));
}

return {
errors: Object.keys(errorCountByMessage),
success: false,
bulkCreateDuration: makeFloatString(end - start),
createdItemsCount: createdItems.length,
createdItems,
};
if (!isEmpty(errorCountByMessage)) {
logger.error(
buildRuleMessage(
`[-] bulkResponse had errors with responses of: ${JSON.stringify(errorCountByMessage)}`
)
);

return {
errors: Object.keys(errorCountByMessage),
success: false,
bulkCreateDuration: makeFloatString(end - start),
createdItemsCount: createdItems.length,
createdItems,
};
} else {
return {
errors: [],
success: true,
bulkCreateDuration: makeFloatString(end - start),
createdItemsCount: createdItems.length,
createdItems,
};
}
} else {
return {
errors: [],
success: true,
bulkCreateDuration: makeFloatString(end - start),
createdItemsCount: createdItems.length,
createdItems,
createdItemsCount: 0,
createdItems: [],
};
}
};

0 comments on commit 7fac6a2

Please sign in to comment.