Skip to content

Commit

Permalink
Cleanup action task params objects after successful execution (#55227)
Browse files Browse the repository at this point in the history
* Cleanup action task params saved objects after use

* Fix jest tests

* Add integration test to ensure object gets cleaned up

* Add unit tests

* Fix comment

* Re-use updated_at instead of creating createdAt

* Consider null/undefined returned from executor as success as well

Co-authored-by: Elastic Machine <[email protected]>
  • Loading branch information
mikecote and elasticmachine authored Jan 27, 2020
1 parent 4df1c4c commit 8fe39ae
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 4 deletions.
2 changes: 1 addition & 1 deletion x-pack/plugins/actions/server/lib/action_executor.mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { ActionExecutorContract } from './action_executor';
const createActionExecutorMock = () => {
const mocked: jest.Mocked<ActionExecutorContract> = {
initialize: jest.fn(),
execute: jest.fn(),
execute: jest.fn().mockResolvedValue({ status: 'ok', actionId: '' }),
};
return mocked;
};
Expand Down
55 changes: 55 additions & 0 deletions x-pack/plugins/actions/server/lib/task_runner_factory.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,18 @@ const actionExecutorInitializerParams = {
};
const taskRunnerFactoryInitializerParams = {
spaceIdToNamespace,
logger: loggingServiceMock.create().get(),
encryptedSavedObjectsPlugin: mockedEncryptedSavedObjectsPlugin,
getBasePath: jest.fn().mockReturnValue(undefined),
getScopedSavedObjectsClient: jest.fn().mockReturnValue(services.savedObjectsClient),
};

beforeEach(() => {
jest.resetAllMocks();
actionExecutorInitializerParams.getServices.mockReturnValue(services);
taskRunnerFactoryInitializerParams.getScopedSavedObjectsClient.mockReturnValue(
services.savedObjectsClient
);
});

test(`throws an error if factory isn't initialized`, () => {
Expand Down Expand Up @@ -135,6 +140,56 @@ test('executes the task by calling the executor with proper parameters', async (
});
});

test('cleans up action_task_params object', async () => {
const taskRunner = taskRunnerFactory.create({
taskInstance: mockedTaskInstance,
});

mockedActionExecutor.execute.mockResolvedValueOnce({ status: 'ok', actionId: '2' });
spaceIdToNamespace.mockReturnValueOnce('namespace-test');
mockedEncryptedSavedObjectsPlugin.getDecryptedAsInternalUser.mockResolvedValueOnce({
id: '3',
type: 'action_task_params',
attributes: {
actionId: '2',
params: { baz: true },
apiKey: Buffer.from('123:abc').toString('base64'),
},
references: [],
});

await taskRunner.run();

expect(services.savedObjectsClient.delete).toHaveBeenCalledWith('action_task_params', '3');
});

test('runs successfully when cleanup fails and logs the error', async () => {
const taskRunner = taskRunnerFactory.create({
taskInstance: mockedTaskInstance,
});

mockedActionExecutor.execute.mockResolvedValueOnce({ status: 'ok', actionId: '2' });
spaceIdToNamespace.mockReturnValueOnce('namespace-test');
mockedEncryptedSavedObjectsPlugin.getDecryptedAsInternalUser.mockResolvedValueOnce({
id: '3',
type: 'action_task_params',
attributes: {
actionId: '2',
params: { baz: true },
apiKey: Buffer.from('123:abc').toString('base64'),
},
references: [],
});
services.savedObjectsClient.delete.mockRejectedValueOnce(new Error('Fail'));

await taskRunner.run();

expect(services.savedObjectsClient.delete).toHaveBeenCalledWith('action_task_params', '3');
expect(taskRunnerFactoryInitializerParams.logger.error).toHaveBeenCalledWith(
'Failed to cleanup action_task_params object [id="3"]: Fail'
);
});

test('throws an error with suggested retry logic when return status is error', async () => {
const taskRunner = taskRunnerFactory.create({
taskInstance: mockedTaskInstance,
Expand Down
17 changes: 17 additions & 0 deletions x-pack/plugins/actions/server/lib/task_runner_factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@

import { ActionExecutorContract } from './action_executor';
import { ExecutorError } from './executor_error';
import { Logger, CoreStart } from '../../../../../src/core/server';
import { RunContext } from '../../../task_manager/server';
import { PluginStartContract as EncryptedSavedObjectsStartContract } from '../../../encrypted_saved_objects/server';
import { ActionTaskParams, GetBasePathFunction, SpaceIdToNamespaceFunction } from '../types';

export interface TaskRunnerContext {
logger: Logger;
encryptedSavedObjectsPlugin: EncryptedSavedObjectsStartContract;
spaceIdToNamespace: SpaceIdToNamespaceFunction;
getBasePath: GetBasePathFunction;
getScopedSavedObjectsClient: CoreStart['savedObjects']['getScopedClient'];
}

export class TaskRunnerFactory {
Expand All @@ -40,9 +43,11 @@ export class TaskRunnerFactory {

const { actionExecutor } = this;
const {
logger,
encryptedSavedObjectsPlugin,
spaceIdToNamespace,
getBasePath,
getScopedSavedObjectsClient,
} = this.taskRunnerContext!;

return {
Expand Down Expand Up @@ -85,6 +90,7 @@ export class TaskRunnerFactory {
actionId,
request: fakeRequest,
});

if (executorResult.status === 'error') {
// Task manager error handler only kicks in when an error thrown (at this time)
// So what we have to do is throw when the return status is `error`.
Expand All @@ -94,6 +100,17 @@ export class TaskRunnerFactory {
executorResult.retry == null ? false : executorResult.retry
);
}

// Cleanup action_task_params object now that we're done with it
try {
const savedObjectsClient = getScopedSavedObjectsClient(fakeRequest);
await savedObjectsClient.delete('action_task_params', actionTaskParamsId);
} catch (e) {
// Log error only, we shouldn't fail the task because of an error here (if ever there's retry logic)
logger.error(
`Failed to cleanup action_task_params object [id="${actionTaskParamsId}"]: ${e.message}`
);
}
},
};
}
Expand Down
2 changes: 2 additions & 0 deletions x-pack/plugins/actions/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,11 @@ export class ActionsPlugin implements Plugin<Promise<PluginSetupContract>, Plugi
});

taskRunnerFactory!.initialize({
logger,
encryptedSavedObjectsPlugin: plugins.encryptedSavedObjects,
getBasePath: this.getBasePath,
spaceIdToNamespace: this.spaceIdToNamespace,
getScopedSavedObjectsClient: core.savedObjects.getScopedClient,
});

return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ export default function(kibana: any) {
encrypted: schema.string(),
}),
},
async executor({ config, secrets, params, services }: ActionTypeExecutorOptions) {
async executor({ config, secrets, params, services, actionId }: ActionTypeExecutorOptions) {
await services.callCluster('index', {
index: params.index,
refresh: 'wait_for',
Expand All @@ -74,6 +74,7 @@ export default function(kibana: any) {
source: 'action:test.index-record',
},
});
return { status: 'ok', actionId };
},
};
const failingActionType: ActionType = {
Expand Down Expand Up @@ -141,7 +142,7 @@ export default function(kibana: any) {
reference: schema.string(),
}),
},
async executor({ params, services }: ActionTypeExecutorOptions) {
async executor({ params, services, actionId }: ActionTypeExecutorOptions) {
// Call cluster
let callClusterSuccess = false;
let callClusterError;
Expand Down Expand Up @@ -186,8 +187,8 @@ export default function(kibana: any) {
},
});
return {
actionId,
status: 'ok',
actionId: '',
};
},
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,37 @@ export class TaskManagerUtils {
}
});
}

async waitForActionTaskParamsToBeCleanedUp(createdAtFilter: Date): Promise<void> {
return await this.retry.try(async () => {
const searchResult = await this.es.search({
index: '.kibana',
body: {
query: {
bool: {
must: [
{
term: {
type: 'action_task_params',
},
},
{
range: {
updated_at: {
gte: createdAtFilter,
},
},
},
],
},
},
},
});
if (searchResult.hits.total.value) {
throw new Error(
`Expected 0 action_task_params objects but received ${searchResult.hits.total.value}`
);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ export default function alertTests({ getService }: FtrProviderContext) {
reference,
source: 'action:test.index-record',
});

await taskManagerUtils.waitForActionTaskParamsToBeCleanedUp(testStart);
break;
default:
throw new Error(`Scenario untested: ${JSON.stringify(scenario)}`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
ObjectRemover,
AlertUtils,
ensureDatetimeIsWithinRange,
TaskManagerUtils,
} from '../../../common/lib';

// eslint-disable-next-line import/no-default-export
Expand All @@ -24,6 +25,7 @@ export function alertTests({ getService }: FtrProviderContext, space: Space) {
const es = getService('legacyEs');
const retry = getService('retry');
const esTestIndexTool = new ESTestIndexTool(es, retry);
const taskManagerUtils = new TaskManagerUtils(es, retry);

function getAlertingTaskById(taskId: string) {
return supertestWithoutAuth
Expand Down Expand Up @@ -73,6 +75,7 @@ export function alertTests({ getService }: FtrProviderContext, space: Space) {
});

it('should schedule task, run alert and schedule actions', async () => {
const testStart = new Date();
const reference = alertUtils.generateReference();
const response = await alertUtils.createAlwaysFiringAction({ reference });
const alertId = response.body.id;
Expand Down Expand Up @@ -121,6 +124,8 @@ export function alertTests({ getService }: FtrProviderContext, space: Space) {
reference,
source: 'action:test.index-record',
});

await taskManagerUtils.waitForActionTaskParamsToBeCleanedUp(testStart);
});

it('should reschedule failing alerts using the alerting interval and not the Task Manager retry logic', async () => {
Expand Down

0 comments on commit 8fe39ae

Please sign in to comment.