Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Various Task Manage Speedups #373

Merged
merged 10 commits into from
Mar 25, 2020
2 changes: 1 addition & 1 deletion core/src/main/scala/dagr/core/execsystem/TaskManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
logDirectory: Option[Path] = None,
scheduler: Scheduler = TaskManagerDefaults.defaultScheduler,
simulate: Boolean = false,
sleepMilliseconds: Int = 250
sleepMilliseconds: Int = 10
nh13 marked this conversation as resolved.
Show resolved Hide resolved
) extends TaskManagerLike with TaskTracker with FinalStatusReporter with LazyLogging {

private val actualScriptsDirectory = scriptsDirectory getOrElse Io.makeTempDir("scripts")
Expand Down
41 changes: 24 additions & 17 deletions core/src/main/scala/dagr/core/tasksystem/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package dagr.core.tasksystem
import com.fulcrumgenomics.commons.CommonsDef.unreachable
import dagr.core.execsystem.TaskExecutionInfo

import scala.annotation.tailrec
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.util.control.Breaks._
Expand Down Expand Up @@ -68,7 +69,7 @@ object Task {
* @param task one or more tasks in the graph to check.
* @return the set of strongly connected components.
*/
private[core] def findStronglyConnectedComponents(task: Task*): Set[Set[Task]] = {
private[core] def findStronglyConnectedComponents(task: Task*): Iterator[Set[Task]] = {

// 1. find all tasks connected to this task
val visited: mutable.Set[Task] = new mutable.HashSet[Task]()
Expand All @@ -77,19 +78,20 @@ object Task {
while (toVisit.nonEmpty) {
val nextTask: Task = toVisit.head
toVisit -= nextTask
(nextTask.tasksDependedOn.toList ::: nextTask.tasksDependingOnThisTask.toList).foreach(t => if (!visited.contains(t)) toVisit += t)
nextTask.tasksDependedOn.foreach(t => if (!visited.contains(t)) toVisit += t)
nextTask.tasksDependingOnThisTask.foreach(t => if (!visited.contains(t)) toVisit += t)
visited += nextTask
}

// 2. Runs Tarjan's strongly connected components algorithm
val data: TarjanData = new TarjanData
visited.filterNot(data.indexes.contains).foreach(v => findStronglyConnectedComponent(v, data))
visited.iterator.filterNot(data.indexes.contains).foreach(v => findStronglyConnectedComponent(v, data))

// return all the components
data.components.map(component => component.toSet).toSet
data.components.iterator.filter(_.nonEmpty).map(component => component.toSet)
}

/** Indicates if a given set of tasks that are strongly connected components contains a cycle. This is the
/** Indicates if a given set of tasks contain a cycle that is a strongly connected component. This is the
* case if the set size is greater than one, or the task is depends on itself. See [[Task.findStronglyConnectedComponents()]]
* for how to retrieve strongly connected components from a task.
*
Expand All @@ -101,8 +103,8 @@ object Task {
else {
component.headOption match {
case Some(task) =>
task.tasksDependedOn.toSet.contains(task) ||
task.tasksDependingOnThisTask.toSet.contains(task)
task.tasksDependedOn.iterator.contains(task) ||
task.tasksDependingOnThisTask.iterator.contains(task)
case _ => false
}
}
Expand All @@ -118,7 +120,7 @@ object Task {
data.onStack += v

// Consider successors of v
for(w <- v.tasksDependedOn) { // could alternatively use task.getTasksDependingOnThisTask
for (w <- v.tasksDependedOn) { // could alternatively use task.getTasksDependingOnThisTask
if (!data.indexes.contains(w)) {
// Successor w has not yet been visited; recurse on it
findStronglyConnectedComponent(w, data)
Expand All @@ -133,15 +135,20 @@ object Task {
// If v is a root node, pop the stack and generate an SCC
if (data.indexes(v) == data.lowLink(v)) {
val component: mutable.Set[Task] = new mutable.HashSet[Task]()
breakable {
while (data.stack.nonEmpty) {
val w: Task = data.stack.pop()
data.onStack -= w
component += w
if (w == v) break
}
}
data.components += component
buildComponent(data, v, component)
if (component.nonEmpty) data.components += component
}
}

@tailrec
private def buildComponent(data: TarjanData, v: Task, component: mutable.Set[Task]): Unit = {
if (data.onStack.isEmpty) ()
else {
val w: Task = data.stack.pop()
data.onStack -= w
component += w
if (w == v) ()
else buildComponent(data, v, component)
}
}
}
Expand Down