Skip to content

Commit

Permalink
Updating task uuid based on PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ymao1 committed Nov 15, 2021
1 parent 1787d61 commit 6851db2
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 17 deletions.
15 changes: 12 additions & 3 deletions x-pack/plugins/task_manager/server/task_pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ describe('TaskPool', () => {
test('only allows one task with the same id in the task pool', async () => {
const logger = loggingSystemMock.create().get();
const pool = new TaskPool({
maxWorkers$: of(1),
maxWorkers$: of(2),
logger,
});

Expand All @@ -369,7 +369,13 @@ describe('TaskPool', () => {

const taskId = uuid.v4();
const task1 = mockTask({ id: taskId, run: shouldRun });
const task2 = mockTask({ id: taskId, run: shouldNotRun });
const task2 = mockTask({
id: taskId,
run: shouldNotRun,
isSameTask() {
return true;
},
});

await pool.run([task1]);
await pool.run([task2]);
Expand All @@ -388,7 +394,7 @@ describe('TaskPool', () => {
function mockTask(overrides = {}) {
return {
isExpired: false,
taskUuid: uuid.v4(),
taskExecutionId: uuid.v4(),
id: uuid.v4(),
cancel: async () => undefined,
markTaskAsRunning: jest.fn(async () => true),
Expand All @@ -409,6 +415,9 @@ describe('TaskPool', () => {
createTaskRunner: jest.fn(),
};
},
isSameTask() {
return false;
},
...overrides,
};
}
Expand Down
16 changes: 9 additions & 7 deletions x-pack/plugins/task_manager/server/task_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,19 @@ export class TaskPool {
tasksToRun
.filter(
(taskRunner) =>
!Array.from(this.tasksInPool.keys()).some((key) => key.startsWith(taskRunner.id))
!Array.from(this.tasksInPool.keys()).some((executionId: string) =>
taskRunner.isSameTask(executionId)
)
)
.map(async (taskRunner) => {
// We use taskRunner.taskUuid instead of taskRunner.id as key for the task pool map because
// We use taskRunner.taskExecutionId instead of taskRunner.id as key for the task pool map because
// task cancellation is a non-blocking procedure. We calculate the expiration and immediately remove
// the task from the task pool. There is a race condition that can occur when a recurring tasks's schedule
// matches its timeout value. A new instance of the task can be claimed and added to the task pool before
// the cancel function (meant for the previous instance of the task) is actually called. This means the wrong
// task instance is cancelled. We introduce the taskUuid to differentiate between these overlapping instances and
// task instance is cancelled. We introduce the taskExecutionId to differentiate between these overlapping instances and
// ensure that the correct task instance is cancelled.
this.tasksInPool.set(taskRunner.taskUuid, taskRunner);
this.tasksInPool.set(taskRunner.taskExecutionId, taskRunner);
return taskRunner
.markTaskAsRunning()
.then((hasTaskBeenMarkAsRunning: boolean) =>
Expand Down Expand Up @@ -175,12 +177,12 @@ export class TaskPool {
}
})
.then(() => {
this.tasksInPool.delete(taskRunner.taskUuid);
this.tasksInPool.delete(taskRunner.taskExecutionId);
});
}

private handleFailureOfMarkAsRunning(task: TaskRunner, err: Error) {
this.tasksInPool.delete(task.taskUuid);
this.tasksInPool.delete(task.taskExecutionId);
this.logger.error(`Failed to mark Task ${task.toString()} as running: ${err.message}`);
}

Expand Down Expand Up @@ -208,7 +210,7 @@ export class TaskPool {
private async cancelTask(task: TaskRunner) {
try {
this.logger.debug(`Cancelling task ${task.toString()}.`);
this.tasksInPool.delete(task.taskUuid);
this.tasksInPool.delete(task.taskExecutionId);
await task.cancel();
} catch (err) {
this.logger.error(`Failed to cancel task ${task.toString()}: ${err}`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,18 @@ export class EphemeralTaskManagerRunner implements TaskRunner {
}

/**
* Gets the unique uuid of this task instance.
* Gets the exeuction id of this task instance.
*/
public get taskUuid() {
return `${this.id}-${this.uuid}`;
public get taskExecutionId() {
return `${this.id}::${this.uuid}`;
}

/**
* Test whether given execution ID identifies a different execution of this same task
* @param id
*/
public isSameTask(executionId: string) {
return executionId.startsWith(this.id);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ const minutesFromNow = (mins: number): Date => secondsFromNow(mins * 60);

let fakeTimer: sinon.SinonFakeTimers;

jest.mock('uuid', () => ({
v4: () => 'NEW_UUID',
}));

beforeAll(() => {
fakeTimer = sinon.useFakeTimers();
});
Expand All @@ -45,6 +49,19 @@ describe('TaskManagerRunner', () => {
end: jest.fn(),
};

test('execution ID', async () => {
const { runner } = await pendingStageSetup({
instance: {
id: 'foo',
taskType: 'bar',
},
});

expect(runner.taskExecutionId).toEqual(`foo::NEW_UUID`);
expect(runner.isSameTask(`foo::ANOTHER_UUID`)).toEqual(true);
expect(runner.isSameTask(`bar::ANOTHER_UUID`)).toEqual(false);
});

describe('Pending Stage', () => {
beforeEach(() => {
jest.clearAllMocks();
Expand Down
17 changes: 13 additions & 4 deletions x-pack/plugins/task_manager/server/task_running/task_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,11 @@ export interface TaskRunner {
markTaskAsRunning: () => Promise<boolean>;
run: () => Promise<Result<SuccessfulRunResult, FailedRunResult>>;
id: string;
taskUuid: string;
taskExecutionId: string;
stage: string;
isEphemeral?: boolean;
toString: () => string;
isSameTask: (executionId: string) => boolean;
}

export enum TaskRunningStage {
Expand Down Expand Up @@ -187,10 +188,18 @@ export class TaskManagerRunner implements TaskRunner {
}

/**
* Gets the unique uuid of this task instance.
* Gets the execution id of this task instance.
*/
public get taskUuid() {
return `${this.id}-${this.uuid}`;
public get taskExecutionId() {
return `${this.id}::${this.uuid}`;
}

/**
* Test whether given execution ID identifies a different execution of this same task
* @param id
*/
public isSameTask(executionId: string) {
return executionId.startsWith(this.id);
}

/**
Expand Down

0 comments on commit 6851db2

Please sign in to comment.