From 1a2f69aebeeddadc5eef423346b2bcdd29fb09f9 Mon Sep 17 00:00:00 2001 From: Thorarinn Sigurdsson Date: Mon, 20 Jan 2020 12:47:43 +0100 Subject: [PATCH] feat(core): improved task graph concurrency This commit adds batch-based concurrency to the task graph. When TaskGraph's process method is called, the requested tasks are grouped into batches that share one or more keys (including dependencies). These batches are then queued. In each iteration of the task graph's main loop, any pending batches that share no keys with currently in-progress batches are added to the graph. This enables e.g. hot reload tasks to be run concurrently with build and test tasks for their underlying modules, which was one of the primary motivators behind this change. Also replaced the concurrencyLimit option for TaskGraph's processTasks method with an unlimitedConcurrency option. This is useful e.g. when resolving providers. Task nodes from batches with unlimited concurrency are processed regardless of normally available task graph concurrency. --- garden-service/src/events.ts | 4 + garden-service/src/garden.ts | 4 +- garden-service/src/task-graph.ts | 511 +++++++++++++++------ garden-service/src/util/util.ts | 42 +- garden-service/test/helpers.ts | 16 - garden-service/test/unit/src/task-graph.ts | 191 +++++++- garden-service/test/unit/src/util/util.ts | 18 + package.json | 3 +- 8 files changed, 621 insertions(+), 168 deletions(-) diff --git a/garden-service/src/events.ts b/garden-service/src/events.ts index 7d0008b9d5..c055ca3ac7 100644 --- a/garden-service/src/events.ts +++ b/garden-service/src/events.ts @@ -62,6 +62,10 @@ export type Events = { configRemoved: { path: string } + internalError: { + timestamp: Date + error: Error + } projectConfigChanged: {} moduleConfigChanged: { names: string[] diff --git a/garden-service/src/garden.ts b/garden-service/src/garden.ts index 754f368a13..3b7f64df21 100644 --- a/garden-service/src/garden.ts +++ b/garden-service/src/garden.ts @@ -506,9 +506,7 @@ export class Garden { }) // Process as many providers in parallel as possible - const taskResults = await this.processTasks(tasks, { - concurrencyLimit: tasks.length, - }) + const taskResults = await this.processTasks(tasks, { unlimitedConcurrency: true }) const failed = Object.values(taskResults).filter((r) => r && r.error) diff --git a/garden-service/src/task-graph.ts b/garden-service/src/task-graph.ts index 8b7930e2a9..061cf0e171 100644 --- a/garden-service/src/task-graph.ts +++ b/garden-service/src/task-graph.ts @@ -7,11 +7,10 @@ */ import Bluebird from "bluebird" -import PQueue from "p-queue" import chalk from "chalk" import yaml from "js-yaml" import hasAnsi = require("has-ansi") -import { flatten, merge, padEnd, pick } from "lodash" +import { every, flatten, intersection, merge, padEnd, union, uniqWith, without } from "lodash" import { BaseTask, TaskDefinitionError, TaskType } from "./tasks/base" import { LogEntry, LogEntryMetadata, TaskLogStatus } from "./logger/log-entry" @@ -19,6 +18,7 @@ import { toGardenError, GardenBaseError } from "./exceptions" import { Garden } from "./garden" import { dedent } from "./util/string" import uuid from "uuid" +import { defer, relationshipClasses } from "./util/util" class TaskGraphError extends GardenBaseError { type = "task-graph" @@ -31,8 +31,8 @@ export interface TaskResult { name: string output?: any dependencyResults?: TaskResults - completedAt: Date batchId: string + completedAt?: Date error?: Error } @@ -50,8 +50,8 @@ const concurrencyFromEnv = process.env.GARDEN_TASK_CONCURRENCY_LIMIT export const defaultTaskConcurrency = (concurrencyFromEnv && parseInt(concurrencyFromEnv, 10)) || DEFAULT_CONCURRENCY export interface ProcessTasksOpts { - concurrencyLimit?: number throwOnError?: boolean + unlimitedConcurrency?: boolean } export class TaskGraph { @@ -59,6 +59,9 @@ export class TaskGraph { private index: TaskNodeMap private inProgress: TaskNodeMap + private pendingBatches: TaskBatch[] + private inProgressBatches: TaskBatch[] + /** * latestTasks[key] is the most recently requested task (via process) for that key. * We use this table to ensure that the last requested task version is used as @@ -73,43 +76,41 @@ export class TaskGraph { * A given task instance (uniquely identified by its id) should always return the same * list of dependencies (by key) from its getDependencies method. */ - private taskDependencyCache: { [id: string]: Set } // sets of keys + private taskDependencyCache: { [id: string]: BaseTask[] } private resultCache: ResultCache - private opQueue: PQueue constructor(private garden: Garden, private log: LogEntry) { this.roots = new TaskNodeMap() this.index = new TaskNodeMap() this.inProgress = new TaskNodeMap() + this.pendingBatches = [] + this.inProgressBatches = [] this.latestTasks = {} this.pendingKeys = new Set() this.taskDependencyCache = {} this.resultCache = new ResultCache() - this.opQueue = new PQueue({ concurrency: 1 }) this.logEntryMap = {} } async process(tasks: BaseTask[], opts?: ProcessTasksOpts): Promise { - // We generate a new batchId - const batchId = uuid.v4() - for (const t of tasks) { this.latestTasks[t.getKey()] = t } - // We want at most one pending (i.e. not in-progress) task for a given key at any given time, - // so we deduplicate here. - const tasksToProcess = tasks.filter((t) => !this.pendingKeys.has(t.getKey())) - for (const t of tasksToProcess) { - this.pendingKeys.add(t.getKey()) - } - - // Regardless of whether it was added by this call to this.processTasksInternal, we want - // to return the latest result for each requested task. - const resultKeys = tasks.map((t) => t.getKey()) + await this.populateTaskDependencyCache(tasks) + const unlimitedConcurrency = opts ? !!opts.unlimitedConcurrency : false + const batches = this.partition(tasks, { unlimitedConcurrency }) + this.pendingBatches.push(...batches) + this.processGraph() - const results = await this.opQueue.add(() => this.processTasksInternal(batchId, tasksToProcess, resultKeys, opts)) + /** + * Since partitioned batches don't share any result keys, we can safely merge their results. + * + * Note that these promises will never throw errors, since all errors in async code related + * to processing tasks are caught in processNode and stored on that task's result.error. + */ + const results: TaskResults = merge({}, ...(await Bluebird.map(batches, (b) => b.promise))) if (opts && opts.throwOnError) { const failed = Object.entries(results).filter(([_, result]) => result && result.error) @@ -128,21 +129,58 @@ export class TaskGraph { return results } + /** + * Returns an array of TaskBatches, where each batch consists of tasks that share one or more dependencies (or are + * a dependency of another task in their batch). + * + * Also deduplicates tasks by key + version. + */ + partition(tasks: BaseTask[], { unlimitedConcurrency = false }): TaskBatch[] { + const deduplicatedTasks = uniqWith(tasks, (t1, t2) => { + return t1.getKey() === t2.getKey() && t1.version.versionString === t2.version.versionString + }) + + const tasksWithKeys = deduplicatedTasks.map((task) => { + return { task, resultKeys: this.keysWithDependencies(task) } + }) + + const sharesDeps = (task1withKeys, task2withKeys) => { + return intersection(task1withKeys.resultKeys, task2withKeys.resultKeys).length > 0 + } + + return relationshipClasses(tasksWithKeys, sharesDeps).map((cls) => { + const tasksForBatch = cls.map((t) => t.task) + const resultKeys: string[] = union(...cls.map((ts) => ts.resultKeys)) + return new TaskBatch(tasksForBatch, resultKeys, unlimitedConcurrency) + }) + } + + /** + * Populates this.taskDependencyCache for tasks and its dependencies, recursively. + */ + async populateTaskDependencyCache(tasks: BaseTask[]): Promise { + await Bluebird.map(tasks, async (task) => { + const deps = await task.getDependencies() + this.taskDependencyCache[task.getId()] = deps + await this.populateTaskDependencyCache(deps) + }) + } + /** * Rebuilds the dependency relationships between the TaskNodes in this.index, and updates this.roots accordingly. */ private rebuild() { const taskNodes = this.index.getNodes() - // this.taskDependencyCache will already have been populated at this point (happens in addTaskInternal). + // this.taskDependencyCache will already have been populated at this point (happens in process). for (const node of taskNodes) { /** * We set the list of dependency nodes to the intersection of the set of nodes in this.index with * the node's task's dependencies (from configuration). */ node.clear() - const taskDeps = this.taskDependencyCache[node.getId()] || new Set() - node.setDependencies(taskNodes.filter((n) => taskDeps.has(n.getKey()))) + const taskDeps = this.taskDependencyCache[node.getId()] + node.setDependencies(taskNodes.filter((n) => taskDeps.find((d) => d.getKey() === n.getKey()))) } const newRootNodes = taskNodes.filter((n) => n.getDependencies().length === 0) @@ -150,8 +188,8 @@ export class TaskGraph { this.roots.setNodes(newRootNodes) } - private async addTask(batchId: string, task: BaseTask) { - await this.addNodeWithDependencies(task) + private addTask(batchId: string, task: BaseTask, unlimitedConcurrency: boolean) { + this.addNodeWithDependencies(task, batchId, unlimitedConcurrency) this.rebuild() if (this.index.getNode(task)) { this.garden.events.emit("taskPending", { @@ -169,7 +207,7 @@ export class TaskGraph { } } - private getNode(task: BaseTask): TaskNode | null { + private getNode(task: BaseTask, batchId: string, unlimitedConcurrency: boolean): TaskNode | null { const id = task.getId() const key = task.getKey() const existing = this.index @@ -181,145 +219,145 @@ export class TaskGraph { // A task with the same key is already pending. return existing } else { - const cachedResultExists = !!this.resultCache.get(task.getKey(), task.version.versionString) - if (cachedResultExists && !task.force) { + const cachedResult = this.resultCache.get(task.getKey(), task.version.versionString) + if (cachedResult && !task.force) { // No need to add task or its dependencies. + const dependencyResults = this.keysWithDependencies(task) + .map((k) => this.resultCache.getNewest(k)) + .filter(Boolean) + this.provideCachedResultToInProgressBatches(cachedResult, dependencyResults) return null } else { - return new TaskNode(task) + return new TaskNode(task, batchId, unlimitedConcurrency) } } } /** - * Process the graph until it's complete. + * This method implements the graph's main processing loop. + * + * The calls to this.processNode will result in further calls to this.processGraph, eventually resulting in all + * requested tasks being processed. */ - private async processTasksInternal( - batchId: string, - tasks: BaseTask[], - resultKeys: string[], - opts?: ProcessTasksOpts - ): Promise { - const { concurrencyLimit = defaultTaskConcurrency } = opts || {} + private processGraph() { + const concurrencyLimit = defaultTaskConcurrency - for (const task of tasks) { - await this.addTask(batchId, this.latestTasks[task.getKey()]) - } + if (this.index.length === 0 && this.inProgressBatches.length === 0 && this.pendingBatches.length > 0) { + this.log.silly("") + this.log.silly("TaskGraph: this.index before processing") + this.log.silly("---------------------------------------") + this.log.silly(yaml.safeDump(this.index.inspect(), { noRefs: true, skipInvalid: true })) - this.log.silly("") - this.log.silly("TaskGraph: this.index before processing") - this.log.silly("---------------------------------------") - this.log.silly(yaml.safeDump(this.index.inspect(), { noRefs: true, skipInvalid: true })) + this.garden.events.emit("taskGraphProcessing", { startedAt: new Date() }) + } - const _this = this - const results: TaskResults = {} + while (this.pickDisjointPendingBatches().length > 0) { + this.addPendingBatches() + } - this.garden.events.emit("taskGraphProcessing", { startedAt: new Date() }) + if (this.index.length === 0 && this.pendingBatches.length === 0 && this.inProgressBatches.length === 0) { + // done! + this.logEntryMap.counter && this.logEntryMap.counter.setDone({ symbol: "info" }) + this.garden.events.emit("taskGraphComplete", { completedAt: new Date() }) + return + } - const loop = async () => { - if (_this.index.length === 0) { - // done! - this.logEntryMap.counter && this.logEntryMap.counter.setDone({ symbol: "info" }) - this.garden.events.emit("taskGraphComplete", { completedAt: new Date() }) - return - } + const pendingRoots = this.roots.getNodes().filter((n) => !this.inProgress.contains(n)) + const pendingWithUnlimitedConcurrency = pendingRoots.filter((n) => n.unlimitedConcurrency) + const pendingWithLimitedConcurrency = pendingRoots.filter((n) => !n.unlimitedConcurrency) - const batch = _this.roots - .getNodes() - .filter((n) => !this.inProgress.contains(n)) - .slice(0, concurrencyLimit - this.inProgress.length) - - batch.forEach((n) => this.inProgress.addNode(n)) - this.rebuild() - - this.initLogging() - - return Bluebird.map(batch, async (node: TaskNode) => { - const task = node.task - const name = task.getName() - const type = node.getType() - const key = node.getKey() - const description = node.getDescription() - - let result: TaskResult - let success = true - - try { - this.logTask(node) - this.logEntryMap.inProgress.setState(inProgressToStr(this.inProgress.getNodes())) - - const dependencyBaseKeys = (await task.getDependencies()).map((dep) => dep.getKey()) - - const dependencyResults = merge(this.resultCache.pick(dependencyBaseKeys), pick(results, dependencyBaseKeys)) - - try { - this.pendingKeys.delete(task.getKey()) - this.garden.events.emit("taskProcessing", { - name, - type, - key, - startedAt: new Date(), - batchId, - version: task.version, - }) - result = await node.process(dependencyResults, batchId) - - this.garden.events.emit("taskComplete", result) - } catch (error) { - success = false - result = { type, description, key, name, error, completedAt: new Date(), batchId } - this.garden.events.emit("taskError", result) - this.logTaskError(node, error) - this.cancelDependants(batchId, node) - } finally { - // We know the result got assigned in either the try or catch clause - results[key] = result! - this.resultCache.put(key, task.version.versionString, result!) - } - } finally { - this.completeTask(node, success) - } - - return loop() - }) - } + const nodesToProcess = [ + ...pendingWithUnlimitedConcurrency, + ...pendingWithLimitedConcurrency.slice(0, concurrencyLimit - this.inProgress.length), + ] - await loop() + nodesToProcess.forEach((n) => this.inProgress.addNode(n)) this.rebuild() + this.initLogging() - for (const resultKey of resultKeys) { - if (!results[resultKey]) { - // We know there's a cached result for resultKey, since each key in resultKeys - // corresponds to a task that was processed during this run of processTasks, or - // during a previous run of processTasks. See the process method above for details. - results[resultKey] = this.resultCache.getNewest(resultKey)! - } + for (const node of nodesToProcess) { + this.pendingKeys.delete(node.getKey()) + } + for (const node of nodesToProcess) { + this.processNode(node).catch((error) => { + this.garden.events.emit("internalError", { error, timestamp: new Date() }) + this.logInternalError(node, error) + this.cancelDependants(node.batchId, node) + }) } - return results + this.rebuild() } - private addNode(task: BaseTask): TaskNode | null { - const node = this.getNode(task) + private addNode(task: BaseTask, batchId: string, unlimitedConcurrency: boolean): TaskNode | null { + const node = this.getNode(task, batchId, unlimitedConcurrency) if (node) { this.index.addNode(node) } return node } - private async addNodeWithDependencies(task: BaseTask) { - const node = this.addNode(task) + private addNodeWithDependencies(task: BaseTask, batchId: string, unlimitedConcurrency: boolean) { + const node = this.addNode(task, batchId, unlimitedConcurrency) if (node) { - const depTasks = await node.task.getDependencies() - this.taskDependencyCache[node.getId()] = new Set(depTasks.map((d) => d.getKey())) + const depTasks = this.taskDependencyCache[node.getId()] for (const dep of depTasks) { - await this.addNodeWithDependencies(dep) + this.addNodeWithDependencies(dep, batchId, unlimitedConcurrency) } } } + /** + * Processes a single TaskNode to completion, handling errors and providing its result to in-progress task batches. + */ + private async processNode(node: TaskNode) { + let success = true + // Errors thrown in this outer try block are caught in processGraph. + try { + const task = node.task + const name = task.getName() + const type = node.getType() + const key = node.getKey() + const batchId = node.batchId + const description = node.getDescription() + + let result: TaskResult = { type, description, key: task.getKey(), name: task.getName(), batchId } + + this.logTask(node) + this.logEntryMap.inProgress.setState(inProgressToStr(this.inProgress.getNodes())) + + const dependencyBaseKeys = this.taskDependencyCache[task.getId()].map((dep) => dep.getKey()) + const dependencyResults = this.resultCache.pick(dependencyBaseKeys) + + try { + this.garden.events.emit("taskProcessing", { + name, + type, + key, + batchId, + startedAt: new Date(), + version: task.version, + }) + result = await node.process(dependencyResults, node.batchId) + this.garden.events.emit("taskComplete", result) + } catch (error) { + success = false + result = { type, description, key, name, error, completedAt: new Date(), batchId } + this.garden.events.emit("taskError", result) + this.logTaskError(node, error) + this.cancelDependants(batchId, node) + } finally { + this.resultCache.put(key, task.version.versionString, result) + this.provideResultToInProgressBatches(result) + } + } finally { + this.completeTask(node, success) + this.processGraph() + } + } + private completeTask(node: TaskNode, success: boolean) { if (node.getDependencies().length > 0) { throw new TaskGraphError(`Task ${node.getId()} still has unprocessed dependencies`, { node }) @@ -336,7 +374,9 @@ export class TaskGraph { this.pendingKeys.delete(node.getKey()) } - // Recursively remove node's dependants, without removing node. + /** + * Recursively remove node's dependants, without removing node. + */ private cancelDependants(batchId: string, node: TaskNode) { const cancelledAt = new Date() for (const dependant of this.getDependants(node)) { @@ -349,6 +389,7 @@ export class TaskGraph { batchId, }) this.remove(dependant) + this.cancelKeyForInProgressBatches(dependant.getKey()) } this.rebuild() } @@ -360,6 +401,94 @@ export class TaskGraph { return dependants.concat(flatten(dependants.map((d) => this.getDependants(d)))) } + private addPendingBatches() { + const batches = this.pickDisjointPendingBatches() + this.pendingBatches = without(this.pendingBatches, ...batches) + this.inProgressBatches.push(...batches) + for (const batch of batches) { + /** + * We want at most one pending (i.e. not in-progress) task for a given key at any given time, + * so we deduplicate here. + */ + const tasksToProcess = batch.tasks.filter((t) => !this.pendingKeys.has(t.getKey())) + for (const task of tasksToProcess) { + this.addTask(batch.id, this.latestTasks[task.getKey()], batch.unlimitedConcurrency) + } + } + + this.rebuild() + } + + /** + * Find any pending task batches that are disjoint with all in-progress batches, and mutually disjoint among + * themselves (preferring to add older batches first, i.e. lower-indexed in this.pendingBatches). + */ + private pickDisjointPendingBatches(): TaskBatch[] { + const pickedBatches: TaskBatch[] = [] + + const disjointFromAll = (batches: TaskBatch[], candidate: TaskBatch) => every(batches, (b) => b.disjoint(candidate)) + + for (const pending of this.pendingBatches) { + if (disjointFromAll(this.inProgressBatches, pending) && disjointFromAll(pickedBatches, pending)) { + pickedBatches.push(pending) + } + } + + return pickedBatches + } + + private provideResultToInProgressBatches(result: TaskResult) { + const finished: TaskBatch[] = [] + for (const batch of this.inProgressBatches) { + const batchFinished = batch.taskFinished(result) + if (batchFinished) { + finished.push(batch) + } + } + this.inProgressBatches = without(this.inProgressBatches, ...finished) + } + + private provideCachedResultToInProgressBatches(result: TaskResult, depResults: TaskResult[]) { + const finished: TaskBatch[] = [] + for (const batch of this.inProgressBatches) { + const batchFinished = batch.taskCached(result, depResults) + if (batchFinished) { + finished.push(batch) + } + } + this.inProgressBatches = without(this.inProgressBatches, ...finished) + } + + private cancelKeyForInProgressBatches(key: string) { + const finished: TaskBatch[] = [] + for (const batch of this.inProgressBatches) { + const batchFinished = batch.cancelKey(key) + if (batchFinished) { + finished.push(batch) + } + } + this.inProgressBatches = without(this.inProgressBatches, ...finished) + } + + /** + * Returns the keys of task and its dependencies, recursively. + * + * Expects this.taskDependencyCache to have been populated for tasks and their dependencies. + */ + private keysWithDependencies(task: BaseTask): string[] { + const keySet = new Set() + + const getKeys = (t: BaseTask, keys: Set) => { + keys.add(t.getKey()) + for (const dep of this.taskDependencyCache[t.getId()]) { + getKeys(dep, keys) + } + } + + getKeys(task, keySet) + return [...keySet] + } + // Logging private logTask(node: TaskNode) { const entry = this.log.debug({ @@ -405,13 +534,23 @@ export class TaskGraph { } } - private logTaskError(node: TaskNode, err) { + private logTaskError(node: TaskNode, err: Error) { + const prefix = `Failed ${node.getDescription()}. Here is the output:` + this.logError(err, prefix) + } + + private logInternalError(node: TaskNode, err: Error) { + const prefix = `An internal error occurred while ${node.getDescription()}. Here is the output:` + this.logError(err, prefix) + } + + private logError(err: Error, errMessagePrefix: string) { const divider = padEnd("", 80, "━") const error = toGardenError(err) const errorMessage = error.message.trim() const msg = - chalk.red.bold(`\nFailed ${node.getDescription()}. Here is the output:\n${divider}\n`) + + chalk.red.bold(`\n${errMessagePrefix}\n${divider}\n`) + (hasAnsi(errorMessage) ? errorMessage : chalk.red(errorMessage)) + chalk.red.bold(`\n${divider}\n`) @@ -503,11 +642,15 @@ class TaskNodeMap { class TaskNode { task: BaseTask + batchId: string + unlimitedConcurrency: boolean private dependencies: TaskNodeMap - constructor(task: BaseTask) { + constructor(task: BaseTask, batchId: string, unlimitedConcurrency: boolean) { this.task = task + this.batchId = batchId + this.unlimitedConcurrency = unlimitedConcurrency this.dependencies = new TaskNodeMap() } @@ -574,9 +717,7 @@ class ResultCache { * By design, at most one TaskResult (the most recently processed) is cached for a given key. * * Invariant: No concurrent calls are made to this class' instance methods, since they - * only happen within TaskGraph's addTaskInternal and processTasksInternal methods, - * which are never executed concurrently, since they are executed sequentially by the - * operation queue. + * only happen within TaskGraph's processGraph method, which is never executed concurrently. */ private cache: { [key: string]: CachedResult } @@ -613,6 +754,102 @@ class ResultCache { } } +export class TaskBatch { + public id: string + public tasks: BaseTask[] + public unlimitedConcurrency: boolean + /** + * The keys of tasks and their dependencies, recursively. + * + * We want to return results for all these keys, regardless of whether there's a cached result or an + * already pending task for a given key. + */ + public resultKeys: string[] + public remainingResultKeys: Set + public results: TaskResults + public promise: Promise + private resolver: any + + /** + * keys should be the set union of the keys of tasks and those of their dependencies, recursively. + */ + constructor(tasks: BaseTask[], resultKeys: string[], unlimitedConcurrency = false) { + this.id = uuid.v4() + this.tasks = tasks + this.unlimitedConcurrency = unlimitedConcurrency + this.resultKeys = resultKeys + this.remainingResultKeys = new Set(resultKeys) + this.results = {} + const { promise, resolver } = defer() + this.promise = promise + this.resolver = resolver + } + + disjoint(otherBatch: TaskBatch): boolean { + return intersection(this.resultKeys, otherBatch.resultKeys).length === 0 + } + + /** + * Should be called when a task finishes processing and this batch is in progress. + * + * Returns true if this call finishes the batch. + */ + taskFinished(result: TaskResult): boolean { + const key = result.key + if (!this.remainingResultKeys.has(key)) { + return false + } + this.results[key] = result + this.remainingResultKeys.delete(key) + if (this.remainingResultKeys.size === 0) { + this.resolver(this.results) + return true + } else { + return false + } + } + + /** + * Should be called when a task result was read from cache, and this batch is in progress. + * + * Returns true if this call finishes the batch. + */ + taskCached(result: TaskResult, depResults: TaskResult[]): boolean { + const key = result.key + this.results[key] = result + this.remainingResultKeys.delete(key) + for (const depResult of depResults) { + this.results[depResult.key] = depResult + this.remainingResultKeys.delete(depResult.key) + } + if (this.remainingResultKeys.size === 0) { + this.resolver(this.results) + return true + } else { + return false + } + } + + /** + * Should be called when this task, or one of its dependencies, threw an error during processing + * and this batch is in progress. + * + * Returns true if this call finishes the batch. + */ + cancelKey(key: string): boolean { + if (!this.remainingResultKeys.has(key)) { + return false + } + this.remainingResultKeys.delete(key) + if (this.remainingResultKeys.size === 0) { + this.resolver(this.results) + return true + } else { + return false + } + } +} + interface LogEntryMap { [key: string]: LogEntry } diff --git a/garden-service/src/util/util.ts b/garden-service/src/util/util.ts index df4eb3e6e4..7719619f25 100644 --- a/garden-service/src/util/util.ts +++ b/garden-service/src/util/util.ts @@ -17,7 +17,7 @@ import _spawn from "cross-spawn" import { readFile, writeFile } from "fs-extra" import { find, pick, difference, fromPairs, uniqBy } from "lodash" import { TimeoutError, ParameterError, RuntimeError, GardenError } from "../exceptions" -import { isArray, isPlainObject, extend, mapValues, pickBy } from "lodash" +import { isArray, isPlainObject, extend, mapValues, pickBy, range, some } from "lodash" import highlight from "cli-highlight" import chalk from "chalk" import { safeDump } from "js-yaml" @@ -75,6 +75,20 @@ export async function sleep(msec) { return new Promise((resolve) => setTimeout(resolve, msec)) } +/** + * Returns a promise that can be resolved/rejected by calling resolver/rejecter. + */ +export function defer() { + let outerResolve + let outerReject + const promise = new Promise((res, rej) => { + outerResolve = res + outerReject = rej + }) + + return { promise, resolver: outerResolve, rejecter: outerReject } +} + /** * Extracting to a separate function so that we can test output streams */ @@ -458,6 +472,32 @@ export function uniqByName(array: T[]): T[] { return uniqBy(array, (item) => item.name) } +/** + * Returns an array of arrays, where the elements of a given array are the elements of items for which + * isRelated returns true for one or more elements of its class. + * + * I.e. an element is related to at least one element of its class, transitively. + */ +export function relationshipClasses(items: I[], isRelated: (item1: I, item2: I) => boolean): I[][] { + const classes: I[][] = [] + for (const item of items) { + let found = false + for (const classIndex of range(0, classes.length)) { + const cls = classes[classIndex] + if (cls && cls.length && some(cls, (classItem) => isRelated(classItem, item))) { + found = true + cls.push(item) + } + } + + if (!found) { + classes.push([item]) + } + } + + return classes +} + /** * Converts a string identifier to the appropriate casing and style for use in environment variable names. * (e.g. "my-service" -> "MY_SERVICE") diff --git a/garden-service/test/helpers.ts b/garden-service/test/helpers.ts index 924c2959f4..c9dd92f278 100644 --- a/garden-service/test/helpers.ts +++ b/garden-service/test/helpers.ts @@ -373,22 +373,6 @@ export function stubModuleAction { - outerResolve = res - outerReject = rej - }) - - return { promise, resolver: outerResolve, rejecter: outerReject } -} - export async function expectError(fn: Function, typeOrCallback?: string | ((err: any) => void)) { try { await fn() diff --git a/garden-service/test/unit/src/task-graph.ts b/garden-service/test/unit/src/task-graph.ts index 693f803849..1ebaf71c6b 100644 --- a/garden-service/test/unit/src/task-graph.ts +++ b/garden-service/test/unit/src/task-graph.ts @@ -11,9 +11,9 @@ import { join } from "path" import { expect } from "chai" import { BaseTask, TaskType } from "../../../src/tasks/base" import { TaskGraph, TaskResult, TaskResults } from "../../../src/task-graph" -import { makeTestGarden, freezeTime, dataDir, defer } from "../../helpers" +import { makeTestGarden, freezeTime, dataDir, expectError } from "../../helpers" import { Garden } from "../../../src/garden" -import { deepFilter } from "../../../src/util/util" +import { deepFilter, defer } from "../../../src/util/util" import uuid from "uuid" const projectRoot = join(dataDir, "test-project-empty") @@ -90,11 +90,11 @@ class TestTask extends BaseTask { } describe("task-graph", () => { - describe("TaskGraph", () => { - async function getGarden() { - return makeTestGarden(projectRoot) - } + async function getGarden() { + return makeTestGarden(projectRoot) + } + describe("TaskGraph", () => { it("should successfully process a single task without dependencies", async () => { const now = freezeTime() const garden = await getGarden() @@ -134,6 +134,7 @@ describe("task-graph", () => { const generatedBatchId = result?.a?.batchId || uuid.v4() expect(garden.events.eventLog).to.eql([ + { name: "taskGraphProcessing", payload: { startedAt: now } }, { name: "taskPending", payload: { @@ -144,7 +145,6 @@ describe("task-graph", () => { type: task.type, }, }, - { name: "taskGraphProcessing", payload: { startedAt: now } }, { name: "taskProcessing", payload: { @@ -181,6 +181,7 @@ describe("task-graph", () => { const generatedBatchId = results?.a?.batchId || uuid.v4() expect(garden.events.eventLog).to.eql([ + { name: "taskGraphProcessing", payload: { startedAt: now } }, { name: "taskComplete", payload: { @@ -194,7 +195,6 @@ describe("task-graph", () => { output: { dependencyResults: {}, result: "result-a" }, }, }, - { name: "taskGraphProcessing", payload: { startedAt: now } }, { name: "taskGraphComplete", payload: { completedAt: now } }, ]) }) @@ -210,6 +210,7 @@ describe("task-graph", () => { const generatedBatchId = result?.a?.batchId || uuid.v4() expect(garden.events.eventLog).to.eql([ + { name: "taskGraphProcessing", payload: { startedAt: now } }, { name: "taskPending", payload: { @@ -220,7 +221,6 @@ describe("task-graph", () => { type: task.type, }, }, - { name: "taskGraphProcessing", payload: { startedAt: now } }, { name: "taskProcessing", payload: { @@ -250,6 +250,31 @@ describe("task-graph", () => { expect(taskError && taskError.payload["error"]).to.exist }) + it("should throw on task error if throwOnError is set", async () => { + const garden = await getGarden() + const graph = new TaskGraph(garden, garden.log) + const task = new TestTask(garden, "a", false, { throwError: true }) + + await expectError( + () => graph.process([task], { throwOnError: true }), + (err) => expect(err.message).to.include("task(s) failed") + ) + }) + + it("should include any task errors in task results", async () => { + const garden = await getGarden() + const graph = new TaskGraph(garden, garden.log) + const taskA = new TestTask(garden, "a", false, { throwError: true }) + const taskB = new TestTask(garden, "b", false, { throwError: true }) + const taskC = new TestTask(garden, "c", false) + + const results = await graph.process([taskA, taskB, taskC]) + + expect(results.a!.error).to.exist + expect(results.b!.error).to.exist + expect(results.c!.error).to.not.exist + }) + it("should process multiple tasks in dependency order", async () => { const now = freezeTime() const garden = await getGarden() @@ -428,6 +453,78 @@ describe("task-graph", () => { expect(processedVersions).to.eql(["1", "3"]) }) + it("should process requests with unrelated tasks concurrently", async () => { + const garden = await getGarden() + const graph = new TaskGraph(garden, garden.log) + + const resultOrder: string[] = [] + + const callback = async (key: string) => { + resultOrder.push(key) + } + + const { resolver: aStartedResolver } = defer() + const { promise: aDonePromise, resolver: aDoneResolver } = defer() + + const opts = { callback } + const taskADep1 = new TestTask(garden, "a-dep1", false, { ...opts }) + const taskADep2 = new TestTask(garden, "a-dep2", false, { ...opts }) + + const taskA = new TestTask(garden, "a", false, { + dependencies: [taskADep1, taskADep2], + callback: async () => { + aStartedResolver() + resultOrder.push("a") + await aDonePromise + }, + }) + + const taskBDep = new TestTask(garden, "b-dep", false, { ...opts }) + const taskB = new TestTask(garden, "b", false, { ...opts, dependencies: [taskBDep] }) + const taskC = new TestTask(garden, "c", false, { ...opts }) + + const firstProcess = graph.process([taskA, taskADep1, taskADep2]) + const secondProcess = graph.process([taskB, taskBDep]) + const thirdProcess = graph.process([taskC]) + aDoneResolver() + await Bluebird.all([firstProcess, secondProcess, thirdProcess]) + expect(resultOrder).to.eql(["c", "a-dep1", "a-dep2", "b-dep", "a", "b"]) + }) + + it("should process two requests with related tasks sequentially", async () => { + const garden = await getGarden() + const graph = new TaskGraph(garden, garden.log) + + const resultOrder: string[] = [] + + const callback = async (key: string) => { + resultOrder.push(key) + } + + const { resolver: aStartedResolver } = defer() + const { promise: aDonePromise, resolver: aDoneResolver } = defer() + + const opts = { callback } + const taskADep = new TestTask(garden, "a-dep1", true, { ...opts }) + + const taskA = new TestTask(garden, "a", true, { + dependencies: [taskADep], + callback: async () => { + aStartedResolver() + resultOrder.push("a") + await aDonePromise + }, + }) + + const repeatTaskBDep = new TestTask(garden, "b-dep", true, { ...opts }) + + const firstProcess = graph.process([taskA, taskADep]) + const secondProcess = graph.process([repeatTaskBDep]) + aDoneResolver() + await Bluebird.all([firstProcess, secondProcess]) + expect(resultOrder).to.eql(["b-dep", "a-dep1", "a"]) + }) + it("should recursively cancel a task's dependants when it throws an error", async () => { const now = freezeTime() const garden = await getGarden() @@ -481,11 +578,11 @@ describe("task-graph", () => { expect(results.b).to.have.property("error") expect(resultOrder).to.eql(["a", "b"]) expect(filteredEventLog).to.eql([ + { name: "taskGraphProcessing", payload: {} }, { name: "taskPending", payload: { key: "a", name: "a", type: "test", batchId: generatedBatchId } }, { name: "taskPending", payload: { key: "b", name: "b", type: "test", batchId: generatedBatchId } }, { name: "taskPending", payload: { key: "c", name: "c", type: "test", batchId: generatedBatchId } }, { name: "taskPending", payload: { key: "d", name: "d", type: "test", batchId: generatedBatchId } }, - { name: "taskGraphProcessing", payload: {} }, { name: "taskProcessing", payload: { key: "a", name: "a", type: "test", batchId: generatedBatchId } }, { name: "taskComplete", @@ -510,5 +607,79 @@ describe("task-graph", () => { { name: "taskGraphComplete", payload: {} }, ]) }) + + describe("partition", () => { + it("should partition a task list into unrelated batches", async () => { + const garden = await getGarden() + const graph = new TaskGraph(garden, garden.log) + + const taskADep1 = new TestTask(garden, "a-dep1", false) + const taskADep2 = new TestTask(garden, "a-dep2", false) + const taskA = new TestTask(garden, "a", false, { dependencies: [taskADep1, taskADep2] }) + const taskBDep = new TestTask(garden, "b-dep", false) + const taskB = new TestTask(garden, "b", false, { dependencies: [taskBDep] }) + const taskC = new TestTask(garden, "c", false) + + const tasks = [taskA, taskB, taskC, taskADep1, taskBDep, taskADep2] + + await graph.populateTaskDependencyCache(tasks) + const batches = graph.partition(tasks, { unlimitedConcurrency: false }) + const batchKeys = batches.map((b) => b.tasks.map((t) => t.getKey())) + + expect(batchKeys).to.eql([["a", "a-dep1", "a-dep2"], ["b", "b-dep"], ["c"]]) + }) + + it("should correctly deduplicate and partition tasks by key and version", async () => { + const garden = await getGarden() + const graph = new TaskGraph(garden, garden.log) + + // Version 1 of task A, and its dependencies + const taskAv1Dep1 = new TestTask(garden, "a-v1-dep1", false) + const taskAv1Dep2 = new TestTask(garden, "a-v1-dep2", false) + const taskAv1 = new TestTask(garden, "a-v1", false, { dependencies: [taskAv1Dep1, taskAv1Dep2] }) + + // Version 2 of task A, and its dependencies + const taskAv2Dep1 = new TestTask(garden, "a-v2-dep1", false) + const taskAv2Dep2 = new TestTask(garden, "a-v2-dep2", false) + const taskAv2 = new TestTask(garden, "a-v2", false, { dependencies: [taskAv2Dep1, taskAv2Dep2] }) + + // A duplicate of task A at version 1, and its dependencies + const dupTaskAv1Dep1 = new TestTask(garden, "a-v1-dep1", false) + const dupTaskAv1Dep2 = new TestTask(garden, "a-v1-dep2", false) + const dupTaskAv1 = new TestTask(garden, "a-v1", false, { dependencies: [dupTaskAv1Dep1, dupTaskAv1Dep2] }) + + const taskBDep = new TestTask(garden, "b-dep", false) + const taskB = new TestTask(garden, "b", false, { dependencies: [taskBDep] }) + const taskC = new TestTask(garden, "c", false) + + const tasks = [ + taskAv1, + taskAv1Dep1, + taskAv1Dep2, + taskAv2, + taskAv2Dep1, + taskAv2Dep2, + dupTaskAv1, + dupTaskAv1Dep1, + dupTaskAv1Dep2, + taskB, + taskBDep, + taskC, + ] + + await graph.populateTaskDependencyCache(tasks) + const batches = graph.partition(tasks, { + unlimitedConcurrency: false, + }) + const batchKeys = batches.map((b) => b.tasks.map((t) => t.getKey())) + + expect(batchKeys).to.eql([ + ["a-v1", "a-v1-dep1", "a-v1-dep2"], + ["a-v2", "a-v2-dep1", "a-v2-dep2"], + ["b", "b-dep"], + ["c"], + ]) + }) + }) }) }) diff --git a/garden-service/test/unit/src/util/util.ts b/garden-service/test/unit/src/util/util.ts index d267d58492..8309a67772 100644 --- a/garden-service/test/unit/src/util/util.ts +++ b/garden-service/test/unit/src/util/util.ts @@ -8,6 +8,7 @@ import { expect } from "chai" import { describe } from "mocha" +import { includes } from "lodash" import { pickKeys, getEnvVarName, @@ -19,6 +20,7 @@ import { makeErrorMsg, renderOutputStream, spawn, + relationshipClasses, } from "../../../../src/util/util" import { expectError } from "../../../helpers" import { splitFirst } from "../../../../src/util/util" @@ -326,4 +328,20 @@ describe("util", () => { expect(splitLast("foo", ":")).to.eql(["", "foo"]) }) }) + + describe("relationshipClasses", () => { + it("should correctly partition related items", () => { + const items = ["ab", "b", "c", "a", "cd"] + const isRelated = (s1: string, s2: string) => includes(s1, s2) || includes(s2, s1) + expect(relationshipClasses(items, isRelated)).to.eql([ + ["ab", "b", "a"], + ["c", "cd"], + ]) + }) + + it("should return a single partition when only one item is passed", () => { + const isRelated = (s1: string, s2: string) => s1[0] === s2[0] + expect(relationshipClasses(["a"], isRelated)).to.eql([["a"]]) + }) + }) }) diff --git a/package.json b/package.json index 1c4ff16508..305aee5c14 100644 --- a/package.json +++ b/package.json @@ -67,5 +67,6 @@ "commit-msg": "commitlint -E HUSKY_GIT_PARAMS", "pre-push": "npm run check-all && npm test" } - } + }, + "dependencies": {} }