From 6851db2fbe45e77dcae486dc2b350bc551379947 Mon Sep 17 00:00:00 2001 From: Ying Mao Date: Sun, 14 Nov 2021 22:13:57 -0500 Subject: [PATCH] Updating task uuid based on PR comments --- .../task_manager/server/task_pool.test.ts | 15 ++++++++++++--- x-pack/plugins/task_manager/server/task_pool.ts | 16 +++++++++------- .../task_running/ephemeral_task_runner.ts | 14 +++++++++++--- .../server/task_running/task_runner.test.ts | 17 +++++++++++++++++ .../server/task_running/task_runner.ts | 17 +++++++++++++---- 5 files changed, 62 insertions(+), 17 deletions(-) diff --git a/x-pack/plugins/task_manager/server/task_pool.test.ts b/x-pack/plugins/task_manager/server/task_pool.test.ts index fca599a2d8981..8b10fb6c1ca3d 100644 --- a/x-pack/plugins/task_manager/server/task_pool.test.ts +++ b/x-pack/plugins/task_manager/server/task_pool.test.ts @@ -360,7 +360,7 @@ describe('TaskPool', () => { test('only allows one task with the same id in the task pool', async () => { const logger = loggingSystemMock.create().get(); const pool = new TaskPool({ - maxWorkers$: of(1), + maxWorkers$: of(2), logger, }); @@ -369,7 +369,13 @@ describe('TaskPool', () => { const taskId = uuid.v4(); const task1 = mockTask({ id: taskId, run: shouldRun }); - const task2 = mockTask({ id: taskId, run: shouldNotRun }); + const task2 = mockTask({ + id: taskId, + run: shouldNotRun, + isSameTask() { + return true; + }, + }); await pool.run([task1]); await pool.run([task2]); @@ -388,7 +394,7 @@ describe('TaskPool', () => { function mockTask(overrides = {}) { return { isExpired: false, - taskUuid: uuid.v4(), + taskExecutionId: uuid.v4(), id: uuid.v4(), cancel: async () => undefined, markTaskAsRunning: jest.fn(async () => true), @@ -409,6 +415,9 @@ describe('TaskPool', () => { createTaskRunner: jest.fn(), }; }, + isSameTask() { + return false; + }, ...overrides, }; } diff --git a/x-pack/plugins/task_manager/server/task_pool.ts b/x-pack/plugins/task_manager/server/task_pool.ts index 65eda9c8c1d17..d8a9fdd6223fe 100644 --- a/x-pack/plugins/task_manager/server/task_pool.ts +++ b/x-pack/plugins/task_manager/server/task_pool.ts @@ -114,17 +114,19 @@ export class TaskPool { tasksToRun .filter( (taskRunner) => - !Array.from(this.tasksInPool.keys()).some((key) => key.startsWith(taskRunner.id)) + !Array.from(this.tasksInPool.keys()).some((executionId: string) => + taskRunner.isSameTask(executionId) + ) ) .map(async (taskRunner) => { - // We use taskRunner.taskUuid instead of taskRunner.id as key for the task pool map because + // We use taskRunner.taskExecutionId instead of taskRunner.id as key for the task pool map because // task cancellation is a non-blocking procedure. We calculate the expiration and immediately remove // the task from the task pool. There is a race condition that can occur when a recurring tasks's schedule // matches its timeout value. A new instance of the task can be claimed and added to the task pool before // the cancel function (meant for the previous instance of the task) is actually called. This means the wrong - // task instance is cancelled. We introduce the taskUuid to differentiate between these overlapping instances and + // task instance is cancelled. We introduce the taskExecutionId to differentiate between these overlapping instances and // ensure that the correct task instance is cancelled. - this.tasksInPool.set(taskRunner.taskUuid, taskRunner); + this.tasksInPool.set(taskRunner.taskExecutionId, taskRunner); return taskRunner .markTaskAsRunning() .then((hasTaskBeenMarkAsRunning: boolean) => @@ -175,12 +177,12 @@ export class TaskPool { } }) .then(() => { - this.tasksInPool.delete(taskRunner.taskUuid); + this.tasksInPool.delete(taskRunner.taskExecutionId); }); } private handleFailureOfMarkAsRunning(task: TaskRunner, err: Error) { - this.tasksInPool.delete(task.taskUuid); + this.tasksInPool.delete(task.taskExecutionId); this.logger.error(`Failed to mark Task ${task.toString()} as running: ${err.message}`); } @@ -208,7 +210,7 @@ export class TaskPool { private async cancelTask(task: TaskRunner) { try { this.logger.debug(`Cancelling task ${task.toString()}.`); - this.tasksInPool.delete(task.taskUuid); + this.tasksInPool.delete(task.taskExecutionId); await task.cancel(); } catch (err) { this.logger.error(`Failed to cancel task ${task.toString()}: ${err}`); diff --git a/x-pack/plugins/task_manager/server/task_running/ephemeral_task_runner.ts b/x-pack/plugins/task_manager/server/task_running/ephemeral_task_runner.ts index 701b44f73db25..0695ed149c9a4 100644 --- a/x-pack/plugins/task_manager/server/task_running/ephemeral_task_runner.ts +++ b/x-pack/plugins/task_manager/server/task_running/ephemeral_task_runner.ts @@ -115,10 +115,18 @@ export class EphemeralTaskManagerRunner implements TaskRunner { } /** - * Gets the unique uuid of this task instance. + * Gets the exeuction id of this task instance. */ - public get taskUuid() { - return `${this.id}-${this.uuid}`; + public get taskExecutionId() { + return `${this.id}::${this.uuid}`; + } + + /** + * Test whether given execution ID identifies a different execution of this same task + * @param id + */ + public isSameTask(executionId: string) { + return executionId.startsWith(this.id); } /** diff --git a/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts b/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts index 86e2230461eb5..02be86c3db0c2 100644 --- a/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts +++ b/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts @@ -32,6 +32,10 @@ const minutesFromNow = (mins: number): Date => secondsFromNow(mins * 60); let fakeTimer: sinon.SinonFakeTimers; +jest.mock('uuid', () => ({ + v4: () => 'NEW_UUID', +})); + beforeAll(() => { fakeTimer = sinon.useFakeTimers(); }); @@ -45,6 +49,19 @@ describe('TaskManagerRunner', () => { end: jest.fn(), }; + test('execution ID', async () => { + const { runner } = await pendingStageSetup({ + instance: { + id: 'foo', + taskType: 'bar', + }, + }); + + expect(runner.taskExecutionId).toEqual(`foo::NEW_UUID`); + expect(runner.isSameTask(`foo::ANOTHER_UUID`)).toEqual(true); + expect(runner.isSameTask(`bar::ANOTHER_UUID`)).toEqual(false); + }); + describe('Pending Stage', () => { beforeEach(() => { jest.clearAllMocks(); diff --git a/x-pack/plugins/task_manager/server/task_running/task_runner.ts b/x-pack/plugins/task_manager/server/task_running/task_runner.ts index 43d401f8c327a..cd07b4db728c4 100644 --- a/x-pack/plugins/task_manager/server/task_running/task_runner.ts +++ b/x-pack/plugins/task_manager/server/task_running/task_runner.ts @@ -69,10 +69,11 @@ export interface TaskRunner { markTaskAsRunning: () => Promise; run: () => Promise>; id: string; - taskUuid: string; + taskExecutionId: string; stage: string; isEphemeral?: boolean; toString: () => string; + isSameTask: (executionId: string) => boolean; } export enum TaskRunningStage { @@ -187,10 +188,18 @@ export class TaskManagerRunner implements TaskRunner { } /** - * Gets the unique uuid of this task instance. + * Gets the execution id of this task instance. */ - public get taskUuid() { - return `${this.id}-${this.uuid}`; + public get taskExecutionId() { + return `${this.id}::${this.uuid}`; + } + + /** + * Test whether given execution ID identifies a different execution of this same task + * @param id + */ + public isSameTask(executionId: string) { + return executionId.startsWith(this.id); } /**