Skip to content

Commit

Permalink
fix: Cancel dependants on task error.
Browse files Browse the repository at this point in the history
When an exception occurs while a task is being processed, its dependants
are now recursively removed.
  • Loading branch information
thsig committed May 23, 2018
1 parent a8f8712 commit 6831608
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 23 deletions.
31 changes: 25 additions & 6 deletions src/task-graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ export class TaskGraph {
const type = node.getType()
const baseKey = node.getBaseKey()
const description = node.getDescription()
let result

try {
this.logTask(node)
Expand All @@ -154,17 +155,17 @@ export class TaskGraph {
this.resultCache.pick(dependencyBaseKeys),
pick(results, dependencyBaseKeys))

let result
try {
result = await node.process(dependencyResults)
} catch (error) {
result = { type, description, error }
this.cancelDependants(node)
} finally {
results[baseKey] = result
this.resultCache.put(baseKey, task.version.versionString, result)
}
} finally {
this.completeTask(node)
this.completeTask(node, !result.error)
}

return loop()
Expand All @@ -176,7 +177,7 @@ export class TaskGraph {
return results
}

private completeTask(node: TaskNode) {
private completeTask(node: TaskNode, success: boolean) {
if (node.getDependencies().length > 0) {
throw new TaskGraphError(`Task ${node.getKey()} still has unprocessed dependencies`)
}
Expand All @@ -190,7 +191,7 @@ export class TaskGraph {
}

this.remove(node)
this.logTaskComplete(node)
this.logTaskComplete(node, success)
}

private getPredecessor(task: Task): TaskNode | null {
Expand Down Expand Up @@ -245,6 +246,22 @@ export class TaskGraph {
this.inProgress.removeNode(node)
}

// Recursively remove node's dependants, without removing node.
private cancelDependants(node: TaskNode) {
const remover = (n) => {
for (const dependant of n.getDependants()) {
this.logTaskComplete(n, false)
remover(dependant)
}
this.remove(n)
}

for (const dependant of node.getDependants()) {
node.removeDependant(dependant)
remover(dependant)
}
}

// Logging
private logTask(node: TaskNode) {
const entry = this.ctx.log.debug({
Expand All @@ -255,9 +272,11 @@ export class TaskGraph {
this.logEntryMap[node.getKey()] = entry
}

private logTaskComplete(node: TaskNode) {
private logTaskComplete(node: TaskNode, success: boolean) {
const entry = this.logEntryMap[node.getKey()]
entry && entry.setSuccess()
if (entry) {
success ? entry.setSuccess() : entry.setError()
}
this.logEntryMap.counter.setState(remainingTasksToStr(this.index.length))
}

Expand Down
87 changes: 70 additions & 17 deletions test/src/task-graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,25 @@ import {

const projectRoot = join(__dirname, "..", "data", "test-project-empty")

type TestTaskCallback = (name: string, result: any) => Promise<void>

interface TestTaskOptions {
callback?: TestTaskCallback
id?: string
throwError?: boolean
}

class TestTask extends Task {
type = "test"
name: string
callback: TestTaskCallback | null
id: string
throwError: boolean

constructor(
name: string,
dependencies?: Task[],
private callback?: (name: string, result: any) => Promise<void>,
id: string = "",
options?: TestTaskOptions,
) {
super({
version: {
Expand All @@ -29,7 +38,9 @@ class TestTask extends Task {
},
})
this.name = name
this.id = id
this.callback = (options && options.callback) || null
this.id = (options && options.id) || ""
this.throwError = !!(options && options.throwError)

if (dependencies) {
this.dependencies = dependencies
Expand All @@ -48,10 +59,6 @@ class TestTask extends Task {
return this.id ? `${this.name}.${this.id}` : this.name
}

async getVersion() {
return this.version
}

getDescription() {
return this.getKey()
}
Expand All @@ -63,6 +70,10 @@ class TestTask extends Task {
await this.callback(this.getKey(), result.result)
}

if (this.throwError) {
throw new Error()
}

return result
}
}
Expand Down Expand Up @@ -110,10 +121,12 @@ describe("task-graph", () => {
callbackResults[key] = result
}

const taskA = new TestTask("a", [], callback)
const taskB = new TestTask("b", [taskA], callback)
const taskC = new TestTask("c", [taskB], callback)
const taskD = new TestTask("d", [taskB, taskC], callback)
const opts = { callback }

const taskA = new TestTask("a", [], opts)
const taskB = new TestTask("b", [taskA], opts)
const taskC = new TestTask("c", [taskB], opts)
const taskD = new TestTask("d", [taskB, taskC], opts)

// we should be able to add tasks multiple times and in any order
await graph.addTask(taskC)
Expand Down Expand Up @@ -188,6 +201,46 @@ describe("task-graph", () => {
})
})

it("should recursively cancel a task's dependants when it throws an error", async() => {
const ctx = await getContext()
const graph = new TaskGraph(ctx)

const callbackResults = {}
const resultOrder: string[] = []

const callback = async (key: string, result: any) => {
resultOrder.push(key)
}

const opts = { callback }

const taskA = new TestTask("a", [], opts)
const taskB = new TestTask("b", [taskA], { callback, throwError: true})
const taskC = new TestTask("c", [taskB], opts)
const taskD = new TestTask("d", [taskB, taskC], opts)

await graph.addTask(taskA)
await graph.addTask(taskB)
await graph.addTask(taskC)
await graph.addTask(taskD)

const results = await graph.processTasks()

const resultA: TaskResult = {
type: "test",
description: "a",
output: {
result: "result-a",
dependencyResults: {},
},
dependencyResults: {},
}

expect(results.a).to.eql(resultA)
expect(results.b).to.have.property("error")
expect(resultOrder).to.eql(["a", "b"])
})

it.skip(
"should process a task as an inheritor of an existing, in-progress task when they have the same base key",
async () =>
Expand Down Expand Up @@ -231,13 +284,13 @@ describe("task-graph", () => {
callbackResults[key] = result
}

const dependencyA = new TestTask("dependencyA", [], defaultCallback)
const dependencyB = new TestTask("dependencyB", [], defaultCallback)
const parentTask = new TestTask("sharedName", [dependencyA, dependencyB], parentCallback, "1")
const dependantA = new TestTask("dependantA", [parentTask], defaultCallback)
const dependantB = new TestTask("dependantB", [parentTask], defaultCallback)
const dependencyA = new TestTask("dependencyA", [], {callback: defaultCallback})
const dependencyB = new TestTask("dependencyB", [], {callback: defaultCallback})
const parentTask = new TestTask("sharedName", [dependencyA, dependencyB], {callback: parentCallback, id: "1"})
const dependantA = new TestTask("dependantA", [parentTask], {callback: defaultCallback})
const dependantB = new TestTask("dependantB", [parentTask], {callback: defaultCallback})

const inheritorTask = new TestTask("sharedName", [dependencyA, dependencyB], defaultCallback, "2")
const inheritorTask = new TestTask("sharedName", [dependencyA, dependencyB], {callback: defaultCallback, id: "2"})

await graph.addTask(dependencyA)
await graph.addTask(dependencyB)
Expand Down

0 comments on commit 6831608

Please sign in to comment.