From 838039705414b2e85eb1536f888f7fd1f45f345d Mon Sep 17 00:00:00 2001 From: Thorarinn Sigurdsson Date: Thu, 11 Jul 2019 19:04:40 +0200 Subject: [PATCH] fix(task-graph): use latest version for dedup Before this fix, the first task to become pending for a given key was the one chosen by the deduplication process, instead of the last added task. Now, when a task is added and there's already a pending task with the same key, the previously pending task is effectively replaced with the new task, ensuring that the last added task is the one that gets processed. --- garden-service/src/task-graph.ts | 14 +++++++++++++- garden-service/test/unit/src/task-graph.ts | 21 +++++++++++++-------- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/garden-service/src/task-graph.ts b/garden-service/src/task-graph.ts index 925237e2fd..6865e0ae3f 100644 --- a/garden-service/src/task-graph.ts +++ b/garden-service/src/task-graph.ts @@ -49,6 +49,13 @@ export class TaskGraph { private roots: TaskNodeMap private index: TaskNodeMap private inProgress: TaskNodeMap + + /** + * latestTasks[key] is the most recently requested task (via process) for that key. + * We use this table to ensure that the last requested task version is used as + * we deduplicate tasks by key. + */ + private latestTasks: { [key: string]: BaseTask } private pendingKeys: Set private logEntryMap: LogEntryMap @@ -66,6 +73,7 @@ export class TaskGraph { this.roots = new TaskNodeMap() this.index = new TaskNodeMap() this.inProgress = new TaskNodeMap() + this.latestTasks = {} this.pendingKeys = new Set() this.taskDependencyCache = {} this.resultCache = new ResultCache() @@ -74,6 +82,10 @@ export class TaskGraph { } async process(tasks: BaseTask[]): Promise { + for (const t of tasks) { + this.latestTasks[t.getKey()] = t + } + // We want at most one pending (i.e. not in-progress) task for a given key at any given time, // so we deduplicate here. const tasksToProcess = tasks.filter(t => !this.pendingKeys.has(t.getKey())) @@ -153,7 +165,7 @@ export class TaskGraph { */ private async processTasksInternal(tasks: BaseTask[], resultKeys: string[]): Promise { for (const task of tasks) { - await this.addTask(task) + await this.addTask(this.latestTasks[task.getKey()]) } this.log.silly("") diff --git a/garden-service/test/unit/src/task-graph.ts b/garden-service/test/unit/src/task-graph.ts index 92e6235afc..4a9ef2212f 100644 --- a/garden-service/test/unit/src/task-graph.ts +++ b/garden-service/test/unit/src/task-graph.ts @@ -323,34 +323,39 @@ describe("task-graph", () => { const garden = await getGarden() const graph = new TaskGraph(garden, garden.log) - let processCount = 0 + const processedVersions: string[] = [] const { promise: t1StartedPromise, resolver: t1StartedResolver } = defer() const { promise: t1DonePromise, resolver: t1DoneResolver } = defer() const t1 = new TestTask(garden, "a", false, { versionString: "1", + uid: "1", callback: async () => { t1StartedResolver() + processedVersions.push("1") await t1DonePromise - processCount++ }, }) - const repeatedCallback = async () => { processCount++ } - const t2 = new TestTask(garden, "a", false, { versionString: "2", callback: repeatedCallback }) - const t3 = new TestTask(garden, "a", false, { versionString: "3", callback: repeatedCallback }) + const repeatedCallback = (version: string) => { + return async () => { + processedVersions.push(version) + } + } + const t2 = new TestTask(garden, "a", false, { uid: "2", versionString: "2", callback: repeatedCallback("2") }) + const t3 = new TestTask(garden, "a", false, { uid: "3", versionString: "3", callback: repeatedCallback("3") }) const firstProcess = graph.process([t1]) - // We make sure t1 is being processed before adding t2 and t3. This way, one of them - // (but not both) should be scheduled after t1 finishes, resulting in a processCount of 2. + // We make sure t1 is being processed before adding t2 and t3. Since t3 is added after t2, + // only t1 and t3 should be processed (since t2 and t3 have the same key, "a"). await t1StartedPromise const secondProcess = graph.process([t2]) const thirdProcess = graph.process([t3]) t1DoneResolver() await Bluebird.all([firstProcess, secondProcess, thirdProcess]) - expect(processCount).to.eq(2) + expect(processedVersions).to.eql(["1", "3"]) }) it("should recursively cancel a task's dependants when it throws an error", async () => {