diff --git a/examples/vote-helm/garden.yml b/examples/vote-helm/garden.yml index 2f93462b44..3ca6f97a0a 100644 --- a/examples/vote-helm/garden.yml +++ b/examples/vote-helm/garden.yml @@ -11,4 +11,4 @@ providers: context: gke_garden-dev-200012_europe-west1-b_garden-dev-1 namespace: vote-helm-testing-${local.env.CIRCLE_BUILD_NUM || local.username} defaultHostname: vote-helm-testing.dev-1.sys.garden - buildMode: cluster-docker \ No newline at end of file + buildMode: cluster-docker diff --git a/garden-service/src/task-graph.ts b/garden-service/src/task-graph.ts index 061cf0e171..c40f02d822 100644 --- a/garden-service/src/task-graph.ts +++ b/garden-service/src/task-graph.ts @@ -59,25 +59,16 @@ export class TaskGraph { private index: TaskNodeMap private inProgress: TaskNodeMap - private pendingBatches: TaskBatch[] - private inProgressBatches: TaskBatch[] + private pendingBatches: TaskNodeBatch[] + private inProgressBatches: TaskNodeBatch[] /** - * latestTasks[key] is the most recently requested task (via process) for that key. - * We use this table to ensure that the last requested task version is used as - * we deduplicate tasks by key. + * latestNodes[key] is the node for the most recently requested task (via process) for that key. + * We use this map to ensure that the last requested task version is used as we deduplicate + * tasks nodes by key. */ - private latestTasks: { [key: string]: BaseTask } - private pendingKeys: Set - + private latestNodes: { [key: string]: TaskNode } private logEntryMap: LogEntryMap - - /** - * A given task instance (uniquely identified by its id) should always return the same - * list of dependencies (by key) from its getDependencies method. - */ - private taskDependencyCache: { [id: string]: BaseTask[] } - private resultCache: ResultCache constructor(private garden: Garden, private log: LogEntry) { @@ -86,21 +77,22 @@ export class TaskGraph { this.inProgress = new TaskNodeMap() this.pendingBatches = [] this.inProgressBatches = [] - this.latestTasks = {} - this.pendingKeys = new Set() - this.taskDependencyCache = {} + this.latestNodes = {} this.resultCache = new ResultCache() this.logEntryMap = {} } async process(tasks: BaseTask[], opts?: ProcessTasksOpts): Promise { - for (const t of tasks) { - this.latestTasks[t.getKey()] = t + const unlimitedConcurrency = opts ? !!opts.unlimitedConcurrency : false + const nodes = await this.nodesWithDependencies(tasks, unlimitedConcurrency) + + const batches = this.partition(nodes, { unlimitedConcurrency }) + for (const batch of batches) { + for (const node of batch.nodes) { + this.latestNodes[node.getKey()] = node + } } - await this.populateTaskDependencyCache(tasks) - const unlimitedConcurrency = opts ? !!opts.unlimitedConcurrency : false - const batches = this.partition(tasks, { unlimitedConcurrency }) this.pendingBatches.push(...batches) this.processGraph() @@ -129,40 +121,36 @@ export class TaskGraph { return results } + async nodesWithDependencies(tasks: BaseTask[], unlimitedConcurrency = false): Promise { + return Bluebird.map(tasks, async (task) => { + const depNodes = await this.nodesWithDependencies(await task.getDependencies(), unlimitedConcurrency) + return new TaskNode(task, depNodes, unlimitedConcurrency) + }) + } + /** - * Returns an array of TaskBatches, where each batch consists of tasks that share one or more dependencies (or are - * a dependency of another task in their batch). + * Returns an array of TaskNodeBatches, where each batch consists of nodes that share one or more dependencies (or are + * a dependency of another node in their batch). * - * Also deduplicates tasks by key + version. + * Also deduplicates nodes by node key + version. */ - partition(tasks: BaseTask[], { unlimitedConcurrency = false }): TaskBatch[] { - const deduplicatedTasks = uniqWith(tasks, (t1, t2) => { - return t1.getKey() === t2.getKey() && t1.version.versionString === t2.version.versionString + partition(nodes: TaskNode[], { unlimitedConcurrency = false }): TaskNodeBatch[] { + const deduplicatedNodes = uniqWith(nodes, (n1, n2) => { + return n1.getKey() === n2.getKey() && n1.getVersion() === n2.getVersion() }) - const tasksWithKeys = deduplicatedTasks.map((task) => { - return { task, resultKeys: this.keysWithDependencies(task) } + const nodesWithKeys = deduplicatedNodes.map((node) => { + return { node, resultKeys: this.keysWithDependencies(node) } }) - const sharesDeps = (task1withKeys, task2withKeys) => { - return intersection(task1withKeys.resultKeys, task2withKeys.resultKeys).length > 0 + const sharesDeps = (node1withKeys, node2withKeys) => { + return intersection(node1withKeys.resultKeys, node2withKeys.resultKeys).length > 0 } - return relationshipClasses(tasksWithKeys, sharesDeps).map((cls) => { - const tasksForBatch = cls.map((t) => t.task) + return relationshipClasses(nodesWithKeys, sharesDeps).map((cls) => { + const nodesForBatch = cls.map((n) => n.node) const resultKeys: string[] = union(...cls.map((ts) => ts.resultKeys)) - return new TaskBatch(tasksForBatch, resultKeys, unlimitedConcurrency) - }) - } - - /** - * Populates this.taskDependencyCache for tasks and its dependencies, recursively. - */ - async populateTaskDependencyCache(tasks: BaseTask[]): Promise { - await Bluebird.map(tasks, async (task) => { - const deps = await task.getDependencies() - this.taskDependencyCache[task.getId()] = deps - await this.populateTaskDependencyCache(deps) + return new TaskNodeBatch(nodesForBatch, resultKeys, unlimitedConcurrency) }) } @@ -172,63 +160,74 @@ export class TaskGraph { private rebuild() { const taskNodes = this.index.getNodes() - // this.taskDependencyCache will already have been populated at this point (happens in process). for (const node of taskNodes) { /** * We set the list of dependency nodes to the intersection of the set of nodes in this.index with * the node's task's dependencies (from configuration). */ - node.clear() - const taskDeps = this.taskDependencyCache[node.getId()] - node.setDependencies(taskNodes.filter((n) => taskDeps.find((d) => d.getKey() === n.getKey()))) + node.clearRemainingDependencies() + const deps = node.getDependencies() + node.setRemainingDependencies(taskNodes.filter((n) => deps.find((d) => d.getKey() === n.getKey()))) } - const newRootNodes = taskNodes.filter((n) => n.getDependencies().length === 0) + const newRootNodes = taskNodes.filter((n) => n.getRemainingDependencies().length === 0) this.roots.clear() this.roots.setNodes(newRootNodes) } - private addTask(batchId: string, task: BaseTask, unlimitedConcurrency: boolean) { - this.addNodeWithDependencies(task, batchId, unlimitedConcurrency) + private addNode(node: TaskNode) { + this.addNodeWithDependencies(node) this.rebuild() - if (this.index.getNode(task)) { + if (this.index.contains(node)) { + const task = node.task this.garden.events.emit("taskPending", { addedAt: new Date(), - batchId, - key: task.getKey(), + batchId: node.batchId, + key: node.getKey(), name: task.getName(), type: task.type, }) } else { - const result = this.resultCache.get(task.getKey(), task.version.versionString) + const result = this.resultCache.get(node.getKey(), node.getVersion()) if (result) { this.garden.events.emit("taskComplete", result) } } } - private getNode(task: BaseTask, batchId: string, unlimitedConcurrency: boolean): TaskNode | null { - const id = task.getId() - const key = task.getKey() + private addNodeWithDependencies(node: TaskNode) { + const nodeToAdd = this.getNodeToAdd(node) + if (nodeToAdd) { + this.index.addNode(nodeToAdd) + for (const depNode of nodeToAdd.getDependencies()) { + this.addNodeWithDependencies(depNode) + } + } + } + + private getNodeToAdd(node: TaskNode): TaskNode | null { + const id = node.getId() + const key = node.getKey() + const task = node.task const existing = this.index .getNodes() .filter((n) => n.getKey() === key && n.getId() !== id) .reverse()[0] if (existing) { - // A task with the same key is already pending. - return existing + // A node with the same key is already pending/in the index, so no node needs to be added. + return null } else { - const cachedResult = this.resultCache.get(task.getKey(), task.version.versionString) + const cachedResult = this.resultCache.get(node.getKey(), node.getVersion()) if (cachedResult && !task.force) { // No need to add task or its dependencies. - const dependencyResults = this.keysWithDependencies(task) + const dependencyResults = this.keysWithDependencies(node) .map((k) => this.resultCache.getNewest(k)) .filter(Boolean) this.provideCachedResultToInProgressBatches(cachedResult, dependencyResults) return null } else { - return new TaskNode(task, batchId, unlimitedConcurrency) + return node } } } @@ -276,9 +275,6 @@ export class TaskGraph { this.rebuild() this.initLogging() - for (const node of nodesToProcess) { - this.pendingKeys.delete(node.getKey()) - } for (const node of nodesToProcess) { this.processNode(node).catch((error) => { this.garden.events.emit("internalError", { error, timestamp: new Date() }) @@ -290,25 +286,6 @@ export class TaskGraph { this.rebuild() } - private addNode(task: BaseTask, batchId: string, unlimitedConcurrency: boolean): TaskNode | null { - const node = this.getNode(task, batchId, unlimitedConcurrency) - if (node) { - this.index.addNode(node) - } - return node - } - - private addNodeWithDependencies(task: BaseTask, batchId: string, unlimitedConcurrency: boolean) { - const node = this.addNode(task, batchId, unlimitedConcurrency) - - if (node) { - const depTasks = this.taskDependencyCache[node.getId()] - for (const dep of depTasks) { - this.addNodeWithDependencies(dep, batchId, unlimitedConcurrency) - } - } - } - /** * Processes a single TaskNode to completion, handling errors and providing its result to in-progress task batches. */ @@ -328,7 +305,7 @@ export class TaskGraph { this.logTask(node) this.logEntryMap.inProgress.setState(inProgressToStr(this.inProgress.getNodes())) - const dependencyBaseKeys = this.taskDependencyCache[task.getId()].map((dep) => dep.getKey()) + const dependencyBaseKeys = node.getDependencies().map((dep) => dep.getKey()) const dependencyResults = this.resultCache.pick(dependencyBaseKeys) try { @@ -340,7 +317,7 @@ export class TaskGraph { startedAt: new Date(), version: task.version, }) - result = await node.process(dependencyResults, node.batchId) + result = await node.process(dependencyResults) this.garden.events.emit("taskComplete", result) } catch (error) { success = false @@ -349,7 +326,7 @@ export class TaskGraph { this.logTaskError(node, error) this.cancelDependants(batchId, node) } finally { - this.resultCache.put(key, task.version.versionString, result) + this.resultCache.put(key, node.getVersion(), result) this.provideResultToInProgressBatches(result) } } finally { @@ -359,10 +336,9 @@ export class TaskGraph { } private completeTask(node: TaskNode, success: boolean) { - if (node.getDependencies().length > 0) { + if (node.getRemainingDependencies().length > 0) { throw new TaskGraphError(`Task ${node.getId()} still has unprocessed dependencies`, { node }) } - this.remove(node) this.logTaskComplete(node, success) this.rebuild() @@ -371,7 +347,6 @@ export class TaskGraph { private remove(node: TaskNode) { this.index.removeNode(node) this.inProgress.removeNode(node) - this.pendingKeys.delete(node.getKey()) } /** @@ -397,7 +372,7 @@ export class TaskGraph { private getDependants(node: TaskNode): TaskNode[] { const dependants = this.index .getNodes() - .filter((n) => n.getDependencies().find((d) => d.getKey() === node.getKey())) + .filter((n) => n.getRemainingDependencies().find((d) => d.getKey() === node.getKey())) return dependants.concat(flatten(dependants.map((d) => this.getDependants(d)))) } @@ -410,9 +385,8 @@ export class TaskGraph { * We want at most one pending (i.e. not in-progress) task for a given key at any given time, * so we deduplicate here. */ - const tasksToProcess = batch.tasks.filter((t) => !this.pendingKeys.has(t.getKey())) - for (const task of tasksToProcess) { - this.addTask(batch.id, this.latestTasks[task.getKey()], batch.unlimitedConcurrency) + for (const node of batch.nodes) { + this.addNode(this.latestNodes[node.getKey()]) } } @@ -423,10 +397,12 @@ export class TaskGraph { * Find any pending task batches that are disjoint with all in-progress batches, and mutually disjoint among * themselves (preferring to add older batches first, i.e. lower-indexed in this.pendingBatches). */ - private pickDisjointPendingBatches(): TaskBatch[] { - const pickedBatches: TaskBatch[] = [] + private pickDisjointPendingBatches(): TaskNodeBatch[] { + const pickedBatches: TaskNodeBatch[] = [] - const disjointFromAll = (batches: TaskBatch[], candidate: TaskBatch) => every(batches, (b) => b.disjoint(candidate)) + const disjointFromAll = (batches: TaskNodeBatch[], candidate: TaskNodeBatch) => { + return every(batches, (b) => b.disjoint(candidate)) + } for (const pending of this.pendingBatches) { if (disjointFromAll(this.inProgressBatches, pending) && disjointFromAll(pickedBatches, pending)) { @@ -438,7 +414,7 @@ export class TaskGraph { } private provideResultToInProgressBatches(result: TaskResult) { - const finished: TaskBatch[] = [] + const finished: TaskNodeBatch[] = [] for (const batch of this.inProgressBatches) { const batchFinished = batch.taskFinished(result) if (batchFinished) { @@ -449,7 +425,7 @@ export class TaskGraph { } private provideCachedResultToInProgressBatches(result: TaskResult, depResults: TaskResult[]) { - const finished: TaskBatch[] = [] + const finished: TaskNodeBatch[] = [] for (const batch of this.inProgressBatches) { const batchFinished = batch.taskCached(result, depResults) if (batchFinished) { @@ -460,7 +436,7 @@ export class TaskGraph { } private cancelKeyForInProgressBatches(key: string) { - const finished: TaskBatch[] = [] + const finished: TaskNodeBatch[] = [] for (const batch of this.inProgressBatches) { const batchFinished = batch.cancelKey(key) if (batchFinished) { @@ -471,21 +447,19 @@ export class TaskGraph { } /** - * Returns the keys of task and its dependencies, recursively. - * - * Expects this.taskDependencyCache to have been populated for tasks and their dependencies. + * Returns the keys of node and its dependencies, recursively. */ - private keysWithDependencies(task: BaseTask): string[] { + private keysWithDependencies(node: TaskNode): string[] { const keySet = new Set() - const getKeys = (t: BaseTask, keys: Set) => { - keys.add(t.getKey()) - for (const dep of this.taskDependencyCache[t.getId()]) { + const getKeys = (n: TaskNode, keys: Set) => { + keys.add(n.getKey()) + for (const dep of n.getDependencies()) { getKeys(dep, keys) } } - getKeys(task, keySet) + getKeys(node, keySet) return [...keySet] } @@ -582,7 +556,7 @@ function metadataForLog(task: BaseTask, status: TaskLogStatus): LogEntryMetadata class TaskNodeMap { // Map is used here to facilitate in-order traversal. - index: Map + index: Map // Keys are task ids length: number constructor() { @@ -642,29 +616,46 @@ class TaskNodeMap { class TaskNode { task: BaseTask - batchId: string + batchId: string // Set in TaskNodeBatch's constructor unlimitedConcurrency: boolean - private dependencies: TaskNodeMap + /** + * The initial dependencies of this node, equivalent to node.task.getDependencies() + */ + private readonly dependencies: TaskNodeMap - constructor(task: BaseTask, batchId: string, unlimitedConcurrency: boolean) { + /** + * Those of this node's dependencies that are in the graph's index (i.e. are scheduled for processing). + * + * This field is updated in TaskGraph's rebuild method. + */ + private remainingDependencies: TaskNodeMap + + constructor(task: BaseTask, dependencies: TaskNode[], unlimitedConcurrency: boolean) { this.task = task - this.batchId = batchId - this.unlimitedConcurrency = unlimitedConcurrency this.dependencies = new TaskNodeMap() + this.unlimitedConcurrency = unlimitedConcurrency + this.dependencies.setNodes(dependencies) + this.remainingDependencies = new TaskNodeMap() + this.dependencies.setNodes(dependencies) } - clear() { - this.dependencies.clear() + getDependencies() { + return this.dependencies.getNodes() } - setDependencies(nodes: TaskNode[]) { + setRemainingDependencies(nodes: TaskNode[]) { for (const node of nodes) { - this.dependencies.addNode(node) + this.remainingDependencies.addNode(node) } } - getDependencies() { - return this.dependencies.getNodes() + + getRemainingDependencies() { + return this.remainingDependencies.getNodes() + } + + clearRemainingDependencies() { + this.remainingDependencies.clear() } getKey() { @@ -683,15 +674,20 @@ class TaskNode { return this.task.type } + getVersion() { + return this.task.version.versionString + } + // For testing/debugging purposes inspect(): object { return { id: this.getId(), dependencies: this.getDependencies().map((d) => d.inspect()), + remainingDependencies: this.getRemainingDependencies().map((d) => d.inspect()), } } - async process(dependencyResults: TaskResults, batchId: string): Promise { + async process(dependencyResults: TaskResults): Promise { const output = await this.task.process(dependencyResults) return { @@ -700,7 +696,7 @@ class TaskNode { name: this.task.getName(), description: this.getDescription(), completedAt: new Date(), - batchId, + batchId: this.batchId, output, dependencyResults, } @@ -754,15 +750,15 @@ class ResultCache { } } -export class TaskBatch { +export class TaskNodeBatch { public id: string - public tasks: BaseTask[] + public nodes: TaskNode[] public unlimitedConcurrency: boolean /** - * The keys of tasks and their dependencies, recursively. + * The keys of nodes and their dependencies, recursively. * * We want to return results for all these keys, regardless of whether there's a cached result or an - * already pending task for a given key. + * already pending node for a given key. */ public resultKeys: string[] public remainingResultKeys: Set @@ -771,11 +767,12 @@ export class TaskBatch { private resolver: any /** - * keys should be the set union of the keys of tasks and those of their dependencies, recursively. + * resultKeys should be the set union of the keys of nodes and those of their dependencies, recursively. */ - constructor(tasks: BaseTask[], resultKeys: string[], unlimitedConcurrency = false) { + constructor(nodes: TaskNode[], resultKeys: string[], unlimitedConcurrency = false) { this.id = uuid.v4() - this.tasks = tasks + this.setBatchId(nodes) + this.nodes = nodes this.unlimitedConcurrency = unlimitedConcurrency this.resultKeys = resultKeys this.remainingResultKeys = new Set(resultKeys) @@ -785,7 +782,14 @@ export class TaskBatch { this.resolver = resolver } - disjoint(otherBatch: TaskBatch): boolean { + setBatchId(nodes: TaskNode[]) { + for (const node of nodes) { + node.batchId = this.id + this.setBatchId(node.getDependencies()) + } + } + + disjoint(otherBatch: TaskNodeBatch): boolean { return intersection(this.resultKeys, otherBatch.resultKeys).length === 0 } diff --git a/garden-service/test/unit/src/task-graph.ts b/garden-service/test/unit/src/task-graph.ts index 1ebaf71c6b..0ad87c17f8 100644 --- a/garden-service/test/unit/src/task-graph.ts +++ b/garden-service/test/unit/src/task-graph.ts @@ -13,7 +13,7 @@ import { BaseTask, TaskType } from "../../../src/tasks/base" import { TaskGraph, TaskResult, TaskResults } from "../../../src/task-graph" import { makeTestGarden, freezeTime, dataDir, expectError } from "../../helpers" import { Garden } from "../../../src/garden" -import { deepFilter, defer } from "../../../src/util/util" +import { deepFilter, defer, sleep } from "../../../src/util/util" import uuid from "uuid" const projectRoot = join(dataDir, "test-project-empty") @@ -448,6 +448,7 @@ describe("task-graph", () => { await t1StartedPromise const secondProcess = graph.process([t2]) const thirdProcess = graph.process([t3]) + await sleep(200) // TODO: Get rid of this? t1DoneResolver() await Bluebird.all([firstProcess, secondProcess, thirdProcess]) expect(processedVersions).to.eql(["1", "3"]) @@ -621,10 +622,9 @@ describe("task-graph", () => { const taskC = new TestTask(garden, "c", false) const tasks = [taskA, taskB, taskC, taskADep1, taskBDep, taskADep2] - - await graph.populateTaskDependencyCache(tasks) - const batches = graph.partition(tasks, { unlimitedConcurrency: false }) - const batchKeys = batches.map((b) => b.tasks.map((t) => t.getKey())) + const taskNodes = await graph.nodesWithDependencies(tasks, false) + const batches = graph.partition(taskNodes, { unlimitedConcurrency: false }) + const batchKeys = batches.map((b) => b.nodes.map((n) => n.getKey())) expect(batchKeys).to.eql([["a", "a-dep1", "a-dep2"], ["b", "b-dep"], ["c"]]) }) @@ -667,11 +667,9 @@ describe("task-graph", () => { taskC, ] - await graph.populateTaskDependencyCache(tasks) - const batches = graph.partition(tasks, { - unlimitedConcurrency: false, - }) - const batchKeys = batches.map((b) => b.tasks.map((t) => t.getKey())) + const taskNodes = await graph.nodesWithDependencies(tasks, false) + const batches = graph.partition(taskNodes, { unlimitedConcurrency: false }) + const batchKeys = batches.map((b) => b.nodes.map((n) => n.getKey())) expect(batchKeys).to.eql([ ["a-v1", "a-v1-dep1", "a-v1-dep2"],