Skip to content

Commit

Permalink
fix(task-graph): fix to dependant cancellation
Browse files Browse the repository at this point in the history
Before this fix, dependants of a task were not cancelled if a failing
cache result already existed for it or one of its dependencies.

Also added more tests around dependant cancellation and the treatment of
cached, failing entries when adding nodes to the graph.
  • Loading branch information
thsig authored and edvald committed Feb 20, 2020
1 parent 9b35d74 commit 626f109
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 17 deletions.
56 changes: 40 additions & 16 deletions garden-service/src/task-graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,29 +214,53 @@ export class TaskGraph {
const id = node.getId()
const key = node.getKey()
const task = node.task

// If found, a node with the same key is already pending/in the index, so no node needs to be added.
const existing = this.index
.getNodes()
.filter((n) => n.getKey() === key && n.getId() !== id)
.reverse()[0]

if (existing) {
// A node with the same key is already pending/in the index, so no node needs to be added.
return null
} else {
const cachedResult = this.resultCache.get(node.getKey(), node.getVersion())
if (cachedResult && !task.force) {
// No need to add task or its dependencies.
const dependencyResults = <TaskResult[]>this.keysWithDependencies(node)
.map((k) => this.resultCache.getNewest(k))
.filter(Boolean)
this.provideCachedResultToInProgressBatches(cachedResult, dependencyResults)
}

const cachedResult = this.resultCache.get(node.getKey(), node.getVersion())
if (cachedResult && !task.force) {
if (cachedResult.error || this.hasFailedDependencyInCache(node)) {
this.cancelDependants(node)
for (const keyToCancel of this.keysWithDependencies(node)) {
this.cancelKeyForInProgressBatches(keyToCancel)
}
return null
} else {
return node
}
// No need to add task or its dependencies.
const dependencyResults = <TaskResult[]>this.keysWithDependencies(node)
.map((k) => this.resultCache.getNewest(k))
.filter(Boolean)
this.provideCachedResultToInProgressBatches(cachedResult, dependencyResults)
return null
} else {
return node
}
}

/**
* Returns true if one or more of node's dependencies has a cached result (and that dependency's task has
* force = false).
*
* Returns false otherwise.
*/
private hasFailedDependencyInCache(node: TaskNode): boolean {
for (const dep of node.getDependencies()) {
const cachedResult = this.resultCache.get(dep.getKey(), dep.getVersion())
if (!dep.task.force && cachedResult && cachedResult.error) {
return true
}
}
return false
}

/**
* This method implements the graph's main processing loop.
*
Expand Down Expand Up @@ -284,7 +308,7 @@ export class TaskGraph {
this.processNode(node).catch((error) => {
this.garden.events.emit("internalError", { error, timestamp: new Date() })
this.logInternalError(node, error)
this.cancelDependants(node.batchId, node)
this.cancelDependants(node)
})
}

Expand Down Expand Up @@ -329,7 +353,7 @@ export class TaskGraph {
result = { type, description, key, name, error, completedAt: new Date(), batchId }
this.garden.events.emit("taskError", result)
this.logTaskError(node, error)
this.cancelDependants(batchId, node)
this.cancelDependants(node)
} finally {
this.resultCache.put(key, node.getVersion(), result)
this.provideResultToInProgressBatches(result)
Expand Down Expand Up @@ -357,7 +381,7 @@ export class TaskGraph {
/**
* Recursively remove node's dependants, without removing node.
*/
private cancelDependants(batchId: string, node: TaskNode) {
private cancelDependants(node: TaskNode) {
const cancelledAt = new Date()
for (const dependant of this.getDependants(node)) {
this.logTaskComplete(dependant, false)
Expand All @@ -366,7 +390,7 @@ export class TaskGraph {
key: dependant.getKey(),
name: dependant.task.getName(),
type: dependant.getType(),
batchId,
batchId: node.batchId,
})
this.remove(dependant)
this.cancelKeyForInProgressBatches(dependant.getKey())
Expand All @@ -377,7 +401,7 @@ export class TaskGraph {
private getDependants(node: TaskNode): TaskNode[] {
const dependants = this.index
.getNodes()
.filter((n) => n.getRemainingDependencies().find((d) => d.getKey() === node.getKey()))
.filter((n) => n.getDependencies().find((d) => d.getKey() === node.getKey()))
return dependants.concat(flatten(dependants.map((d) => this.getDependants(d))))
}

Expand Down
122 changes: 121 additions & 1 deletion garden-service/test/unit/src/task-graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { join } from "path"
import { expect } from "chai"
import { BaseTask, TaskType } from "../../../src/tasks/base"
import { TaskGraph, TaskResult, TaskResults } from "../../../src/task-graph"
import { makeTestGarden, freezeTime, dataDir, expectError } from "../../helpers"
import { makeTestGarden, freezeTime, dataDir, expectError, TestGarden } from "../../helpers"
import { Garden } from "../../../src/garden"
import { deepFilter, defer, sleep } from "../../../src/util/util"
import uuid from "uuid"
Expand Down Expand Up @@ -699,6 +699,126 @@ describe("task-graph", () => {
])
})

context("if a cached, failing result exists for a task", () => {
let garden: TestGarden
let graph: TaskGraph

beforeEach(async () => {
garden = await getGarden()
graph = new TaskGraph(garden, garden.log)

const opts = { versionString: "1" }

const taskA = new TestTask(garden, "a", false, { ...opts, uid: "a1" })
const taskB = new TestTask(garden, "b", false, { ...opts, uid: "b1", throwError: true, dependencies: [taskA] })
const taskC = new TestTask(garden, "c", false, { ...opts, uid: "c1", dependencies: [taskB] })
const taskD = new TestTask(garden, "d", false, { ...opts, uid: "d1", dependencies: [taskB, taskC] })

await graph.process([taskA, taskB, taskC, taskD])

garden.events.eventLog = []
})

it("should recursively cancel dependants if it has a cached, failing result at the same version", async () => {
const repOpts = { versionString: "1" }

const repTaskA = new TestTask(garden, "a", false, { ...repOpts, uid: "a2" })
const repTaskB = new TestTask(garden, "b", false, {
...repOpts,
uid: "b2",
throwError: true,
dependencies: [repTaskA],
})
const repTaskC = new TestTask(garden, "c", false, { ...repOpts, uid: "c2", dependencies: [repTaskB] })
const repTaskD = new TestTask(garden, "d", false, { ...repOpts, uid: "d2", dependencies: [repTaskB, repTaskC] })

await graph.process([repTaskA, repTaskB, repTaskC, repTaskD])

const filteredEventLog = garden.events.eventLog.map((e) => {
return { name: e.name, payload: deepFilter(e.payload, (_, key) => key === "key") }
})

expect(filteredEventLog).to.eql([
{ name: "taskGraphProcessing", payload: {} },
{ name: "taskComplete", payload: { key: "a" } },
{ name: "taskError", payload: { key: "b" } },
{ name: "taskCancelled", payload: { key: "c" } },
{ name: "taskCancelled", payload: { key: "d" } },
{ name: "taskCancelled", payload: { key: "c" } },
{ name: "taskGraphComplete", payload: {} },
])
})

it("should run a task with force = true if it has a cached, failing result at the same version", async () => {
const repOpts = { versionString: "1" }

const repTaskA = new TestTask(garden, "a", true, { ...repOpts, uid: "a2" })
const repTaskB = new TestTask(garden, "b", true, {
...repOpts,
uid: "b2",
throwError: true,
dependencies: [repTaskA],
})
const repTaskC = new TestTask(garden, "c", true, { ...repOpts, uid: "c2", dependencies: [repTaskB] })
const repTaskD = new TestTask(garden, "d", true, { ...repOpts, uid: "d2", dependencies: [repTaskB, repTaskC] })

await graph.process([repTaskA, repTaskB, repTaskC, repTaskD])

const filteredEventLog = garden.events.eventLog.map((e) => {
return { name: e.name, payload: deepFilter(e.payload, (_, key) => key === "key") }
})

expect(filteredEventLog).to.eql([
{ name: "taskGraphProcessing", payload: {} },
{ name: "taskPending", payload: { key: "a" } },
{ name: "taskPending", payload: { key: "b" } },
{ name: "taskPending", payload: { key: "c" } },
{ name: "taskPending", payload: { key: "d" } },
{ name: "taskProcessing", payload: { key: "a" } },
{ name: "taskComplete", payload: { key: "a" } },
{ name: "taskProcessing", payload: { key: "b" } },
{ name: "taskError", payload: { key: "b" } },
{ name: "taskCancelled", payload: { key: "c" } },
{ name: "taskCancelled", payload: { key: "d" } },
{ name: "taskCancelled", payload: { key: "d" } },
{ name: "taskGraphComplete", payload: {} },
])
})

it("should run a task if it has a cached, failing result at a different version", async () => {
const repOpts = { versionString: "2" }

const repTaskA = new TestTask(garden, "a", false, { ...repOpts, uid: "a2" })
// No error this time
const repTaskB = new TestTask(garden, "b", false, { ...repOpts, uid: "b2", dependencies: [repTaskA] })
const repTaskC = new TestTask(garden, "c", false, { ...repOpts, uid: "c2", dependencies: [repTaskB] })
const repTaskD = new TestTask(garden, "d", false, { ...repOpts, uid: "d2", dependencies: [repTaskB, repTaskC] })

await graph.process([repTaskA, repTaskB, repTaskC, repTaskD])

const filteredEventLog = garden.events.eventLog.map((e) => {
return { name: e.name, payload: deepFilter(e.payload, (_, key) => key === "key") }
})

expect(filteredEventLog).to.eql([
{ name: "taskGraphProcessing", payload: {} },
{ name: "taskPending", payload: { key: "a" } },
{ name: "taskPending", payload: { key: "b" } },
{ name: "taskPending", payload: { key: "c" } },
{ name: "taskPending", payload: { key: "d" } },
{ name: "taskProcessing", payload: { key: "a" } },
{ name: "taskComplete", payload: { key: "a" } },
{ name: "taskProcessing", payload: { key: "b" } },
{ name: "taskComplete", payload: { key: "b" } },
{ name: "taskProcessing", payload: { key: "c" } },
{ name: "taskComplete", payload: { key: "c" } },
{ name: "taskProcessing", payload: { key: "d" } },
{ name: "taskComplete", payload: { key: "d" } },
{ name: "taskGraphComplete", payload: {} },
])
})
})

describe("partition", () => {
it("should partition a task list into unrelated batches", async () => {
const garden = await getGarden()
Expand Down

0 comments on commit 626f109

Please sign in to comment.