Skip to content

Commit

Permalink
fix: Added OperationQueue to TaskGraph.
Browse files Browse the repository at this point in the history
  • Loading branch information
thsig authored and edvald committed Apr 25, 2018
1 parent 8191aa8 commit ae79785
Show file tree
Hide file tree
Showing 16 changed files with 258 additions and 150 deletions.
18 changes: 13 additions & 5 deletions src/commands/auto-reload.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
/*
* Copyright (C) 2018 Garden Technologies, Inc. <[email protected]>
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

import { keys, values } from "lodash"
import { Command } from "./base"
import { Module } from "../types/module"
import { GardenContext } from "../context"
import { FSWatcher } from "../fs-watcher"
import { PluginContext } from "../plugin-context"
import { BuildTask } from "../tasks/build"
import { DeployTask } from "../tasks/deploy"
import { registerCleanupFunction, sleep } from "../util"

export type AutoReloadDependants = { [key: string]: Set<Module> }

async function registerAutoReloadWatches(ctx: GardenContext): Promise<FSWatcher | null> {
async function registerAutoReloadWatches(ctx: PluginContext): Promise<FSWatcher | null> {
const allModules = values(await ctx.getModules())
const modules = allModules.filter((m) => !m.skipAutoReload)

Expand All @@ -25,7 +33,7 @@ async function registerAutoReloadWatches(ctx: GardenContext): Promise<FSWatcher

const autoReloadDependants = await computeAutoReloadDependants(modules)

const watcher = new FSWatcher(ctx)
const watcher = new FSWatcher(ctx.projectRoot)
watcher.watchModules(modules, "addTasksForAutoReload/",
async (changedModule, _) => {
ctx.log.info({ msg: `files changed for module ${changedModule.name}` })
Expand All @@ -50,7 +58,7 @@ export async function computeAutoReloadDependants(modules: Module[]):
return dependants
}

export async function addTasksForAutoReload(ctx: GardenContext, module: Module, dependants: AutoReloadDependants) {
export async function addTasksForAutoReload(ctx: PluginContext, module: Module, dependants: AutoReloadDependants) {
const serviceNames = keys(module.services || {})

if (serviceNames.length === 0) {
Expand All @@ -74,7 +82,7 @@ export class AutoReloadCommand extends Command {
name = "autoreload"
help = "Auto-reload modules when sources change"

async action(ctx: GardenContext): Promise<void> {
async action(ctx: PluginContext): Promise<void> {
const watcher = await registerAutoReloadWatches(ctx)

if (!watcher) {
Expand Down
27 changes: 20 additions & 7 deletions src/fs-watcher.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
/*
* Copyright (C) 2018 Garden Technologies, Inc. <[email protected]>
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

import { map as bluebirdMap } from "bluebird"
import { Client } from "fb-watchman"
import { keyBy } from "lodash"
import { resolve } from "path"
import { Module } from "./types/module"
import { GardenContext } from "./context"

export type CapabilityOptions = { required?: string[], optional?: string[] }
export type CapabilityResponse = { error: Error, response: { capabilities: { string: boolean } } }
Expand All @@ -22,12 +29,10 @@ export type SubscriptionResponse = {
}

export class FSWatcher {
readonly ctx: GardenContext
private readonly client
private capabilityCheckComplete: boolean

constructor(ctx: GardenContext) {
this.ctx = ctx
constructor(private projectRoot: string) {
this.client = new Client()
this.capabilityCheckComplete = false
}
Expand All @@ -53,6 +58,7 @@ export class FSWatcher {
})
}

// WIP
async watchModules(modules: Module[], subscriptionPrefix: string,
changeHandler: (Module, SubscriptionResponse) => Promise<void>) {
if (!this.capabilityCheckComplete) {
Expand All @@ -63,10 +69,17 @@ export class FSWatcher {

await bluebirdMap(modules || [], async (module) => {
const subscriptionKey = FSWatcher.subscriptionKey(subscriptionPrefix, module)
const modulePath = resolve(this.ctx.projectRoot, module.path)
const modulePath = resolve(this.projectRoot, module.path)

const result = await this.command(["watch-project", modulePath])

const subscriptionRequest = {}
// console.log("watching", modulePath)

const subscriptionRequest = {
// expression: ["anyof",
// ["dirname", modulePath, ["depth", "ge", 0]]
// ]
}

await this.command([
"subscribe",
Expand All @@ -76,7 +89,7 @@ export class FSWatcher {
})

this.on("subscription", async (response) => {
console.log("file changed:", response)
// console.log("file changed:", response)
const changedModule = modulesBySubscriptionKey[response.subscription]
if (!changedModule) {
console.log("no module found for changed file, skipping auto-rebuild")
Expand Down
98 changes: 93 additions & 5 deletions src/task-graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import * as Bluebird from "bluebird"
import chalk from "chalk"
import { pick } from "lodash"
import { GardenContext } from "./context"
import { Task, TaskDefinitionError } from "./types/task"

import { EntryStyle, LogSymbolType } from "./logger/types"
Expand Down Expand Up @@ -48,16 +47,22 @@ export class TaskGraph {
private inProgress: TaskNodeMap
private logEntryMap: LogEntryMap

private opQueue: OperationQueue

constructor(private ctx: PluginContext, private concurrency: number = DEFAULT_CONCURRENCY) {
this.roots = new TaskNodeMap()
this.index = new TaskNodeMap()
this.inProgress = new TaskNodeMap()
this.opQueue = new OperationQueue(this)
this.logEntryMap = {}
}

async addTask(task: Task) {
// TODO: Detect circular dependencies.
addTask(task: Task): Promise<any> {
return this.opQueue.request({ type: "addTask", task })
}

async addTaskInternal(task: Task) {
// TODO: Detect circular dependencies.
const predecessor = this.getPredecessor(task)
let node = this.getNode(task)

Expand Down Expand Up @@ -96,10 +101,15 @@ export class TaskGraph {
return existing || new TaskNode(task)
}

processTasks(): Promise<TaskResults> {
return this.opQueue.request({ type: "processTasks" })
}

/*
Process the graph until it's complete
*/
async processTasks(): Promise<TaskResults> {
async processTasksInternal(): Promise<TaskResults> {

const _this = this
let results: TaskResults = {}

Expand Down Expand Up @@ -182,7 +192,7 @@ export class TaskGraph {
const nodeDependencies = node.getDependencies()
for (const d of nodeDependencies) {
const dependant = this.getPredecessor(d.task) || d
await this.addTask(dependant.task)
await this.addTaskInternal(dependant.task)
dependant.addDependant(node)
}
}
Expand Down Expand Up @@ -349,3 +359,81 @@ class TaskNode {
return await this.task.process(dependencyResults)
}
}

// TODO: Add more typing to this class.

/*
Used by TaskGraph to prevent race conditions e.g. when calling addTask or
processTasks.
*/
class OperationQueue {
queue: object[]
draining: boolean

constructor(private taskGraph: TaskGraph) {
this.queue = []
this.draining = false
}

request(opRequest): Promise<any> {
let findFn

switch (opRequest.type) {

case "addTask":
findFn = (o) => o.type === "addTask" && o.task.getBaseKey() === opRequest.task.getBaseKey()
break

case "processTasks":
findFn = (o) => o.type === "processTasks"
break
}

const existingOp = this.queue.find(findFn)

const prom = new Promise((resolver) => {
if (existingOp) {
existingOp["resolvers"].push(resolver)
} else {
this.queue.push({ ...opRequest, resolvers: [resolver] })
}
})

if (!this.draining) {
this.process()
}

return prom
}

async process() {
this.draining = true
const op = this.queue.shift()

if (!op) {
this.draining = false
return
}

switch (op["type"]) {

case "addTask":
const task = op["task"]
await this.taskGraph.addTaskInternal(task)
for (const resolver of op["resolvers"]) {
resolver()
}
break

case "processTasks":
const results = await this.taskGraph.processTasksInternal()
for (const resolver of op["resolvers"]) {
resolver(results)
}
break
}

this.process()
}

}
1 change: 0 additions & 1 deletion src/tasks/deploy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import { LogEntry } from "../logger"
import { PluginContext } from "../plugin-context"
import { BuildTask } from "./build"
import { values } from "lodash"
import { Service } from "../types/service"
import { Task } from "../types/task"
import {
Service,
Expand Down
4 changes: 2 additions & 2 deletions src/tasks/push.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
*/

import chalk from "chalk"
import { Task } from "../task-graph"
import { PluginContext } from "../plugin-context"
import { BuildTask } from "./build"
import { Module } from "../types/module"
import { EntryStyle } from "../logger/types"
import { PushResult } from "../types/plugin"
import { Task } from "../types/task"

export class PushTask<T extends Module<any>> extends Task {
type = "push"
Expand All @@ -31,7 +31,7 @@ export class PushTask<T extends Module<any>> extends Task {
return [new BuildTask(this.ctx, this.module, this.forceBuild)]
}

getKey() {
getName() {
// TODO: Include version in the task key (may need to make this method async).
return this.module.name
}
Expand Down
2 changes: 0 additions & 2 deletions src/tasks/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@
*/

import { PluginContext } from "../plugin-context"
import { Task } from "../task-graph"
import { Module, TestSpec } from "../types/module"
import { BuildTask } from "./build"
import { TestResult } from "../types/plugin"
import { DeployTask } from "./deploy"
import { TestResult } from "../types/plugin"
import { Task } from "../types/task"
Expand Down
8 changes: 8 additions & 0 deletions src/types/task.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
/*
* Copyright (C) 2018 Garden Technologies, Inc. <[email protected]>
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

import { TaskResults } from "../task-graph"
import { v1 as uuidv1 } from "uuid"

Expand Down
4 changes: 1 addition & 3 deletions test/src/build-dir.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import { pathExists, readdir } from "fs-extra"
import { expect } from "chai"
import { values } from "lodash"
import { BuildTask } from "../../src/tasks/build"
import {
makeTestGarden,
} from "../helpers"
import { makeTestGarden, } from "../helpers"

/*
Module dependency diagram for test-project-build-products
Expand Down
Loading

0 comments on commit ae79785

Please sign in to comment.