diff --git a/core/src/main/scala/dagr/core/execsystem/GraphNode.scala b/core/src/main/scala/dagr/core/execsystem/GraphNode.scala index 386a44f8..3fcdcd10 100644 --- a/core/src/main/scala/dagr/core/execsystem/GraphNode.scala +++ b/core/src/main/scala/dagr/core/execsystem/GraphNode.scala @@ -43,7 +43,7 @@ class GraphNode(var task: Task, var state: GraphNodeState.Value = GraphNodeState.PREDECESSORS_AND_UNEXPANDED, val enclosingNode: Option[GraphNode] = None) extends BaseGraphNode { - private val _predecessors = new ListBuffer[GraphNode]() + private val _predecessors = new scala.collection.mutable.LinkedHashSet[GraphNode]() _predecessors ++= predecessorNodes @@ -93,9 +93,12 @@ class GraphNode(var task: Task, addPredecessors(predecessor.toSeq:_*) } + /** Gets the number of predecessors */ + def numPredecessors: Int = _predecessors.size + /** Get the predecessors * * @return the current set of predecessors, if any */ - def predecessors: List[GraphNode] = _predecessors.toList + def predecessors: Iterable[GraphNode] = _predecessors } diff --git a/core/src/main/scala/dagr/core/execsystem/NaiveScheduler.scala b/core/src/main/scala/dagr/core/execsystem/NaiveScheduler.scala index 61099fbc..bc91efa2 100644 --- a/core/src/main/scala/dagr/core/execsystem/NaiveScheduler.scala +++ b/core/src/main/scala/dagr/core/execsystem/NaiveScheduler.scala @@ -36,22 +36,15 @@ class NaiveScheduler extends Scheduler { remainingSystemMemory: Memory, remainingJvmMemory: Memory): Option[(UnitTask, ResourceSet)] = { val systemResourceSet: ResourceSet = ResourceSet(remainingSystemCores, remainingSystemMemory) - val jvmResourceSet: ResourceSet = ResourceSet(remainingSystemCores, remainingJvmMemory) + val jvmResourceSet: ResourceSet = ResourceSet(remainingSystemCores, remainingJvmMemory) // Find the first task that can be executed readyTasks - .view // lazy - .map { // pick resources - case task: ProcessTask => (task, task.pickResources(systemResourceSet)) - case task: InJvmTask => (task, task.pickResources(jvmResourceSet)) - } - .find { // find the first that returned a resource set - case (_, Some(resourceSet)) => true - case _ => false - } - .map { // get the resource set - case (task, Some(resourceSet)) => (task, resourceSet) - case _ => throw new IllegalStateException("BUG") + .view + .flatMap { // pick resources + case task: ProcessTask => task.pickResources(systemResourceSet).map { rs => (task, rs) } + case task: InJvmTask => task.pickResources(jvmResourceSet).map { rs => (task, rs) } } + .headOption } /** Runs one round of scheduling, trying to schedule as many ready tasks as possible given the @@ -67,9 +60,9 @@ class NaiveScheduler extends Scheduler { private def scheduleOnce(readyTasks: Iterable[UnitTask], remainingSystemCores: Cores, remainingSystemMemory: Memory, - remainingJvmMemory: Memory): List[(UnitTask, ResourceSet)] = { + remainingJvmMemory: Memory): Seq[(UnitTask, ResourceSet)] = { // no more tasks ready to be scheduled - if (readyTasks.isEmpty) Nil + if (readyTasks.isEmpty) Seq.empty else { logger.debug(s"the resources were [System cores=" + remainingSystemCores.value + " System memory=" + Resource.parseBytesToSize(remainingSystemMemory.value) @@ -77,20 +70,19 @@ class NaiveScheduler extends Scheduler { // try one round of scheduling, and recurse if a task could be scheduled scheduleOneTask(readyTasks, remainingSystemCores, remainingSystemMemory, remainingJvmMemory) match { - case None => - Nil + case None => Seq.empty case Some((task: UnitTask, resourceSet: ResourceSet)) => logger.debug("task to schedule is [" + task.name + "]") logger.debug(s"task [${task.name}] uses the following resources [" + resourceSet + "]") List[(UnitTask, ResourceSet)]((task, resourceSet)) ++ (task match { - case processTask: ProcessTask => + case _: ProcessTask => scheduleOnce( readyTasks = readyTasks.filterNot(t => t == task), remainingSystemCores = remainingSystemCores - resourceSet.cores, remainingSystemMemory = remainingSystemMemory - resourceSet.memory, remainingJvmMemory = remainingJvmMemory ) - case inJvmTask: InJvmTask => + case _: InJvmTask => scheduleOnce( readyTasks = readyTasks.filterNot(t => t == task), remainingSystemCores = remainingSystemCores - resourceSet.cores, diff --git a/core/src/main/scala/dagr/core/execsystem/ResourceSet.scala b/core/src/main/scala/dagr/core/execsystem/ResourceSet.scala index b63f289e..45318873 100644 --- a/core/src/main/scala/dagr/core/execsystem/ResourceSet.scala +++ b/core/src/main/scala/dagr/core/execsystem/ResourceSet.scala @@ -56,13 +56,35 @@ case class ResourceSet(cores: Cores = Cores.none, memory: Memory = Memory.none) /** * Constructs a subset of this resource set with a fixed amount of memory and a variable - * number of cores. Will greedily assign the highest number of cores possible. + * number of cores. Will greedily assign the highest number of cores possible. If the maximum cores is a whole + * number, then a whole number will be returned, unless the minimum cores (which can be a fractional number of cores) + * is the only valid value. If the maximum cores is not a whole number, then the maximum fractional amount will be + * returned. + * + * Example 1: minCores=1, maxCores=5, this.cores=4.5, then 4 cores will be returned. + * Example 2: minCores=1, maxCores=5.1, this.cores=4.5, then 4.5 cores will be returned. + * Example 3: minCores=1, maxCores=5, this.cores=1.5, then 1 core will be returned. + * Example 4: minCores=1.5, maxCores=5, this.cores=1.5, then 1.5 cores will be returned. */ def subset(minCores: Cores, maxCores: Cores, memory: Memory) : Option[ResourceSet] = { - val min = minCores.value - val max = maxCores.value - val cores = Range.BigDecimal.inclusive(max, min, -1).find(cores => subset(Cores(cores.doubleValue), memory).isDefined) - cores.map(c => ResourceSet(Cores(c.doubleValue), memory)) + if (!subsettable(ResourceSet(minCores, memory))) None else { + val coresValue = { + // Try to return a whole number value if maxCores is a whole number. If no whole number exists that is greater + // than or equal to minCores, then just use minCores (which could be fractional). If maxCores is fractional, + // then return a fractional value. + if (maxCores.value.isValidInt) { + // Get the number of cores, but rounded down to get a whole number value + val minValue = Math.floor(Math.min(this.cores.value, maxCores.value)) + // If the number rounded down is smaller than the min-cores, then just return the min-cores + if (minValue < minCores.value) minCores.value else minValue + } else { // any fractional number will do + Math.min(this.cores.value, maxCores.value) + } + } + val resourceSet = ResourceSet(Cores(coresValue), memory) + require(subsettable(resourceSet)) + Some(resourceSet) + } } /** diff --git a/core/src/main/scala/dagr/core/execsystem/TaskManager.scala b/core/src/main/scala/dagr/core/execsystem/TaskManager.scala index d3046884..588346e2 100644 --- a/core/src/main/scala/dagr/core/execsystem/TaskManager.scala +++ b/core/src/main/scala/dagr/core/execsystem/TaskManager.scala @@ -24,13 +24,16 @@ package dagr.core.execsystem import java.nio.file.Path -import java.time.Instant +import java.time.{Duration, Instant} import dagr.core.DagrDef._ import com.fulcrumgenomics.commons.util.LazyLogging import dagr.core.tasksystem._ import com.fulcrumgenomics.commons.collection.BiMap import com.fulcrumgenomics.commons.io.{Io, PathUtil} +import dagr.core.execsystem + +import scala.annotation.tailrec /** The resources needed for the task manager */ object SystemResources { @@ -80,10 +83,22 @@ object TaskManagerDefaults extends LazyLogging { object TaskManager extends LazyLogging { import dagr.core.execsystem.TaskManagerDefaults._ + /** The initial time to wait between scheduling tasks. */ + val InitialSleepMillis: Int = 100 + /** The minimum time to wait between scheduling tasks. */ + val MinSleepMillis: Int = 10 + /** The maximum time to wait between scheduling tasks. */ + val MaxSleepMillis: Int = 1000 + /** The increased amount time to wait between scheduling tasks after nothing can be done (linear increase). */ + val StepSleepMillis: Int = 50 + /** The scaling factor to reduce (divide) the time by to wait between scheduling tasks (exponential backoff). */ + val BackoffSleepFactor: Float = 2f + /** The maximum time between two attempts to task scheduling attempts after which a warning is logged. */ + val SlowStepTimeSeconds: Int = 30 + /** Runs a given task to either completion, failure, or inability to schedule. This will terminate tasks that were still running before returning. * * @param task the task to run - * @param sleepMilliseconds the time to wait in milliseconds to wait between trying to schedule tasks. * @param taskManagerResources the set of task manager resources, otherwise we use the default * @param scriptsDirectory the scripts directory, otherwise we use the default * @param logDirectory the log directory, otherwise we use the default @@ -92,7 +107,6 @@ object TaskManager extends LazyLogging { * @return a bi-directional map from the set of tasks to their execution information. */ def run(task: Task, - sleepMilliseconds: Int = 1000, taskManagerResources: Option[SystemResources] = Some(defaultTaskManagerResources), scriptsDirectory: Option[Path] = None, logDirectory: Option[Path] = None, @@ -105,8 +119,7 @@ object TaskManager extends LazyLogging { scriptsDirectory = scriptsDirectory, logDirectory = logDirectory, scheduler = scheduler.getOrElse(defaultScheduler), - simulate = simulate, - sleepMilliseconds = sleepMilliseconds + simulate = simulate ) taskManager.addTask(task = task) @@ -114,6 +127,7 @@ object TaskManager extends LazyLogging { taskManager.taskToInfoBiMapFor } + } /** A manager of tasks. @@ -125,16 +139,17 @@ object TaskManager extends LazyLogging { * @param logDirectory the log directory, otherwise a temporary directory will be used * @param scheduler the scheduler, otherwise we use the default * @param simulate true if we are to simulate running tasks, false otherwise - * @param sleepMilliseconds the time to wait in milliseconds to wait between trying to schedule tasks. */ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.defaultTaskManagerResources, scriptsDirectory: Option[Path] = None, logDirectory: Option[Path] = None, scheduler: Scheduler = TaskManagerDefaults.defaultScheduler, - simulate: Boolean = false, - sleepMilliseconds: Int = 250 + simulate: Boolean = false + ) extends TaskManagerLike with TaskTracker with FinalStatusReporter with LazyLogging { + private var curSleepMilliseconds: Int = TaskManager.InitialSleepMillis + private val actualScriptsDirectory = scriptsDirectory getOrElse Io.makeTempDir("scripts") protected val actualLogsDirectory = logDirectory getOrElse Io.makeTempDir("logs") @@ -313,20 +328,21 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de */ private def updateCompletedTasks(): Map[TaskId, (Int, Boolean)] = { val completedTasks: Map[TaskId, (Int, Boolean)] = taskExecutionRunner.completedTasks() - val emptyTasks = graphNodesInStateFor(GraphNodeState.NO_PREDECESSORS).filter(_.task.isInstanceOf[Task.EmptyTask]).toSeq - val completedTaskIds = completedTasks.keys ++ emptyTasks.map(_.taskId) + val emptyTasks = graphNodesInStateFor(GraphNodeState.NO_PREDECESSORS).filter(_.task.isInstanceOf[Task.EmptyTask]) emptyTasks.foreach { node => node.taskInfo.status = TaskStatus.SUCCEEDED + processCompletedTask(node.taskId) logger.debug("updateCompletedTasks: empty task [" + node.task.name + "] completed") } - completedTaskIds.foreach { taskId => + completedTasks.keysIterator.foreach { taskId => processCompletedTask(taskId) val name = this(taskId).task.name val status = this(taskId).taskInfo.status logger.debug("updateCompletedTasks: task [" + name + "] completed with task status [" + status + "]") } + completedTasks } @@ -344,7 +360,7 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de logger.debug("updateOrphans: found an orphan task [" + node.task.name + "] that has [" + predecessorsOf(task=node.task).getOrElse(Nil).size + "] predecessors") node.addPredecessors(predecessorsOf(task=node.task).get) - logger.debug("updateOrphans: orphan task [" + node.task.name + "] now has [" + node.predecessors.size + "] predecessors") + logger.debug("updateOrphans: orphan task [" + node.task.name + "] now has [" + node.numPredecessors + "] predecessors") // update its state node.state = PREDECESSORS_AND_UNEXPANDED }) @@ -358,14 +374,15 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de * to have no predecessors: [[NO_PREDECESSORS]]. If we find the former case, we need to perform this procedure again, * since some tasks go strait to succeeded and we may have successor tasks (children) that can now execute. */ + @tailrec private def updatePredecessors(): Unit = { var hasMore = false - for (node <- graphNodesWithPredecessors) { - node.predecessors.filter(p => p.state == GraphNodeState.COMPLETED && TaskStatus.isTaskDone(p.taskInfo.status, failedIsDone=false)).map(p => node.removePredecessor(p)) - logger.debug("updatePredecessors: examining task [" + node.task.name + "] for predecessors: " + node.hasPredecessor) + graphNodesWithPredecessors.foreach { node => + node.predecessors.filter(p => p.state == GraphNodeState.COMPLETED && TaskStatus.isTaskDone(p.taskInfo.status, failedIsDone=false)).foreach(p => node.removePredecessor(p)) + //logger.debug("updatePredecessors: examining task [" + node.task.name + "] for predecessors: " + node.hasPredecessor) // - if this node has already been expanded and now has no predecessors, then move it to the next state. // - if it hasn't been expanded and now has no predecessors, it should get expanded later - if (!node.hasPredecessor) logger.debug(s"updatePredecessors: has node state: ${node.state}") + //if (!node.hasPredecessor) logger.debug(s"updatePredecessors: has node state: ${node.state}") if (!node.hasPredecessor && node.state == ONLY_PREDECESSORS) { val taskInfo = node.taskInfo node.task match { @@ -375,7 +392,7 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de taskInfo.status = TaskStatus.SUCCEEDED hasMore = true // try again for all successors, since we have more nodes that have completed } - logger.debug(s"updatePredecessors: task [${node.task.name}] now has node state [${node.state}] and status [${taskInfo.status}]") + //logger.debug(s"updatePredecessors: task [${node.task.name}] now has node state [${node.state}] and status [${taskInfo.status}]") } } @@ -386,7 +403,7 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de /** Invokes `getTasks` on the task associated with the graph node. * - * (1) In the case that `getTasks` returns the same exact task, check for cycles and verify it is a [[UnitTask]]. Since + * (1) In the case that `getTasks` returns the same exact task, it verifies it is a [[UnitTask]]. Since * the task already has an execution node, it must have already passed to [[addTask()]]. * * (2) In the case that `getTasks` returns a different task, or more than one task, set the submission date of the node, @@ -410,8 +427,10 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de false case x :: Nil if x == node.task => // one task and it returned itself logger.debug(f"invokeGetTasks 2 ${node.task.name} : ${tasks.map(_.name).mkString(", ")}") + // Developer note: removing the check for cycles here because we should have checked for cycles when the task + // was added! // check for cycles only when we have a unit task for which calling [[getTasks] returns itself. - checkForCycles(task = node.task) + // checkForCycles(task = node.task) // verify we have a UnitTask node.task match { case _: UnitTask => @@ -425,10 +444,10 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de // we will make this task dependent on the tasks it creates... if (tasks.contains(node.task)) throw new IllegalStateException(s"Task [${node.task.name}] contained itself in the list returned by getTasks") // track the new tasks. If they are already added, that's fine too. - val taskIds: Seq[TaskId] = tasks.map { task => addTask(task = task, enclosingNode = Some(node), ignoreExists = true) } + val taskIds: Seq[TaskId] = addTasks(tasks, enclosingNode = Some(node), ignoreExists = true) // make this node dependent on those tasks taskIds.map(taskId => node.addPredecessors(this(taskId))) - // we may need to update precedessors if a returned task was already completed + // we may need to update predecessors if a returned task was already completed if (tasks.flatMap(t => graphNodeFor(t)).exists(_.state == GraphNodeState.COMPLETED)) updatePredecessors() // TODO: we could check each new task to see if they are in the PREDECESSORS_AND_UNEXPANDED state true @@ -544,8 +563,8 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de val runningTasks: Map[UnitTask, ResourceSet] = runningTasksMap // get the tasks that are eligible for execution (tasks with no dependents) - val (emptyTasks: List[Task], readyTasks: List[Task]) = { - graphNodesInStateFor(NO_PREDECESSORS).map(_.task).toList.partition(_.isInstanceOf[Task.EmptyTask]) + val (emptyTasks: Seq[Task], readyTasks: Seq[Task]) = { + graphNodesInStateFor(NO_PREDECESSORS).map(_.task).toSeq.partition(_.isInstanceOf[Task.EmptyTask]) } logger.debug(s"stepExecution: found ${readyTasks.size} readyTasks tasks and ${emptyTasks.size} empty tasks") @@ -571,6 +590,14 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de logger.debug("stepExecution: finishing one round of execution") logger.debug("stepExecution: found " + runningTasks.size + " running tasks and " + tasksToSchedule.size + " tasks to schedule") + // Update the current sleep time: exponential reduction if we could do **anything**, otherwise linear increase. + if (canDoAnything) { + curSleepMilliseconds = Math.max(TaskManager.MinSleepMillis, (curSleepMilliseconds / TaskManager.BackoffSleepFactor).toInt) + } + else { + curSleepMilliseconds = Math.min(TaskManager.MaxSleepMillis, curSleepMilliseconds + TaskManager.StepSleepMillis) + } + ( readyTasks, tasksToSchedule.keys, @@ -582,12 +609,28 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de override def runToCompletion(failFast: Boolean): BiMap[Task, TaskExecutionInfo] = { var allDone = false while (!allDone) { + + // Step the execution once + val startTime = Instant.now() val (readyTasks, tasksToSchedule, runningTasks, _) = stepExecution() - Thread.sleep(sleepMilliseconds) + // Warn if the single step in execution "took a long time" + val stepExecutionDuration = Duration.between(startTime, Instant.now()).getSeconds + if (stepExecutionDuration > TaskManager.SlowStepTimeSeconds) { + logger.warning("*" * 80) + logger.warning(s"A single step in execution was > ${TaskManager.SlowStepTimeSeconds}s (${stepExecutionDuration}s).") + val infosByStatus: Map[execsystem.TaskStatus.Value, Iterable[TaskExecutionInfo]] = this.taskToInfoBiMapFor.values.groupBy(_.status) + TaskStatus.values.filter(infosByStatus.contains).foreach { status => + logger.warning(s"Found ${infosByStatus(status).size} tasks with status: $status") + } + logger.warning("*" * 80) + } + + logger.debug(s"Sleeping ${curSleepMilliseconds}ms") + if (curSleepMilliseconds > 0) Thread.sleep(curSleepMilliseconds) - // check if we have only completed or orphan all tasks - allDone = graphNodesInStatesFor(List(ORPHAN, COMPLETED)).size == graphNodes.size + // check if we have only completed or orphan tasks + allDone = allGraphNodesInStates(Set(ORPHAN, COMPLETED)) if (!allDone && runningTasks.isEmpty && tasksToSchedule.isEmpty) { if (readyTasks.nonEmpty) { diff --git a/core/src/main/scala/dagr/core/execsystem/TaskTracker.scala b/core/src/main/scala/dagr/core/execsystem/TaskTracker.scala index b6fa80aa..8d615221 100644 --- a/core/src/main/scala/dagr/core/execsystem/TaskTracker.scala +++ b/core/src/main/scala/dagr/core/execsystem/TaskTracker.scala @@ -28,7 +28,7 @@ import java.time.Instant import com.fulcrumgenomics.commons.CommonsDef._ import com.fulcrumgenomics.commons.collection.BiMap -import com.fulcrumgenomics.commons.util.LazyLogging +import com.fulcrumgenomics.commons.util.{LazyLogging, SimpleCounter} import dagr.core.DagrDef._ import dagr.core.execsystem.TaskStatus._ import dagr.core.tasksystem.Task @@ -66,73 +66,67 @@ trait TaskTracker extends TaskManagerLike with LazyLogging { private val idToTask: mutable.Map[TaskId, Task] = mutable.Map[TaskId, Task]() private val idToNode: mutable.Map[TaskId, GraphNode] = mutable.Map[TaskId, GraphNode]() - override def addTask(task: Task): TaskId = { - addTask(task=task, enclosingNode=None, ignoreExists=false) - } - - /** Adds a task to be managed - * - * Throws an [[IllegalArgumentException]] if a cycle was found after logging each strongly connected component with - * a cycle in the graph. - * - * @param ignoreExists true if we just return the task id for already added tasks, false if we are to throw an [[IllegalArgumentException]] - * @param task the given task. - * @return the task identifier. + /** Adds a task to be managed, but does not check for cycles, or that the next identifier is currently used. This + * should be performed by the caller. */ - protected[execsystem] def addTask(task: Task, enclosingNode: Option[GraphNode], ignoreExists: Boolean = false): TaskId = { - // Make sure the id we will assign the task are not being tracked. - if (idToTask.contains(nextId)) throw new IllegalArgumentException(s"Task '${task.name}' with id '$nextId' was already added!") - if (idToNode.contains(nextId)) throw new IllegalArgumentException(s"Task '${task.name}' with id '$nextId' was already added!") - - taskFor(task) match { - case Some(id) if ignoreExists => id - case Some(id) => throw new IllegalArgumentException(s"Task '${task.name}' with id '$id' was already added!") - case None => - // check for cycles - checkForCycles(task = task) - - // set the task id - val id = yieldAndThen(nextId) {nextId += 1} - // set the task info - require(task._taskInfo.isEmpty) // should not have any info! - val info = new TaskExecutionInfo( - task=task, - taskId=id, - status=UNKNOWN, - script=scriptPathFor(task=task, id=id, attemptIndex=1), - logFile=logPathFor(task=task, id=id, attemptIndex=1), - submissionDate=Some(Instant.now()) - ) - task._taskInfo = Some(info) - - // create the graph node - val node = predecessorsOf(task=task) match { - case None => new GraphNode(task=task, predecessorNodes=Nil, state=GraphNodeState.ORPHAN, enclosingNode=enclosingNode) - case Some(predecessors) => new GraphNode(task=task, predecessorNodes=predecessors, enclosingNode=enclosingNode) - } + private def addTaskNoChecking(task: Task, enclosingNode: Option[GraphNode] = None): TaskId = { + // set the task id + val id = yieldAndThen(nextId) {nextId += 1} + // set the task info + require(task._taskInfo.isEmpty) // should not have any info! + val info = new TaskExecutionInfo( + task = task, + taskId = id, + status = UNKNOWN, + script = scriptPathFor(task=task, id=id, attemptIndex=1), + logFile = logPathFor(task=task, id=id, attemptIndex=1), + submissionDate = Some(Instant.now()) + ) + task._taskInfo = Some(info) + + // create the graph node + val node = predecessorsOf(task=task) match { + case None => new GraphNode(task=task, predecessorNodes=Nil, state=GraphNodeState.ORPHAN, enclosingNode=enclosingNode) + case Some(predecessors) => new GraphNode(task=task, predecessorNodes=predecessors, enclosingNode=enclosingNode) + } - // update the lookups - idToTask.put(id, task) - idToNode.put(id, node) + // update the lookups + idToTask.put(id, task) + idToNode.put(id, node) - id - } + id } - /** Adds tasks to be managed + /** Adds tasks to be managed. * * @param tasks the given tasks. + * @param enclosingNode the graph node of the parent task that generated this task (if any) * @param ignoreExists true if we just return the task id for already added tasks, false if we are to throw an [[IllegalArgumentException]] * @return the task identifiers. */ - protected[execsystem] def addTasks(tasks: Iterable[Task], enclosingNode: Option[GraphNode] = None, ignoreExists: Boolean = false): List[TaskId] = { - tasks.map(task => addTask(task=task, enclosingNode=enclosingNode, ignoreExists=ignoreExists)).toList - } + protected[execsystem] def addTasks(tasks: Seq[Task], enclosingNode: Option[GraphNode] = None, ignoreExists: Boolean = false): Seq[TaskId] = { + // Make sure the id we will assign the task are not being tracked. + if (idToTask.contains(nextId)) throw new IllegalArgumentException(s"Task id '$nextId' was already added!") + + val tasksToAdd = tasks.flatMap { task => + taskFor(task) match { + case Some(_) if ignoreExists => None + case Some(id) => throw new IllegalArgumentException(s"Task '${task.name}' with id '$id' was already added!") + case None => Some(task) + } + } + + checkForCycles(tasksToAdd:_*) - override def addTasks(tasks: Task*): Seq[TaskId] = { - tasks.map(task => addTask(task, enclosingNode=None, ignoreExists=false)) + tasks.map { task => taskFor(task).getOrElse(addTaskNoChecking(task, enclosingNode)) } } + /** Adds a task to be managed. */ + override def addTask(task: Task): TaskId = addTasks(task).head + + /** Adds one or more tasks to be managed. */ + override def addTasks(tasks: Task*): Seq[TaskId] = this.addTasks(tasks, enclosingNode=None, ignoreExists=false) + override def taskFor(id: TaskId): Option[Task] = idToTask.get(id) override def taskExecutionInfoFor(id: TaskId): Option[TaskExecutionInfo] = idToNode.get(id).map(_.taskInfo) @@ -182,6 +176,8 @@ trait TaskTracker extends TaskManagerLike with LazyLogging { */ def graphNodes: Iterable[GraphNode] = idToNode.values + def numGraphNodes: Int = idToNode.size + override def apply(id: TaskId): GraphNode = { this.graphNodeFor(id=id) match { case Some(node) => node @@ -191,7 +187,7 @@ trait TaskTracker extends TaskManagerLike with LazyLogging { override def taskToInfoBiMapFor: BiMap[Task, TaskExecutionInfo] = { val map: BiMap[Task, TaskExecutionInfo] = new BiMap[Task, TaskExecutionInfo]() - idToTask.foreach { case (id, task) => map.add(task, task.taskInfo) } + idToTask.foreach { case (_, task) => map.add(task, task.taskInfo) } map } @@ -259,24 +255,22 @@ trait TaskTracker extends TaskManagerLike with LazyLogging { * * @param task a task in the graph to check. */ - protected def checkForCycles(task: Task): Unit = { + protected def checkForCycles(task: Task*): Unit = { // check for cycles - if (Task.hasCycle(task)) { + if (Task.hasCycle(task:_*)) { logger.error("Task was part of a graph that had a cycle") - for (component <- Task.findStronglyConnectedComponents(task = task)) { + for (component <- Task.findStronglyConnectedComponents(task = task:_*)) { if (Task.isComponentACycle(component = component)) { logger.error("Tasks were part of a strongly connected component with a cycle: " + component.map(t => s"[${t.name}]").mkString(", ")) } } - throw new IllegalArgumentException(s"Task was part of a graph that had a cycle [${task.name}]") + throw new IllegalArgumentException(s"Task(s) had cyclical dependencies [${task.map(_.name).mkString(",")}]") } } /** Returns true if all the predecessors of this task are known (have been added), false otherwise. */ - protected def allPredecessorsAdded(task: Task): Boolean = { - predecessorsOf(task = task).nonEmpty - } + protected def allPredecessorsAdded(task: Task): Boolean = predecessorsOf(task = task).nonEmpty /** Gets the predecessor graph nodes of the given task in the execution graph. * @@ -321,4 +315,9 @@ trait TaskTracker extends TaskManagerLike with LazyLogging { protected def graphNodesWithPredecessors: Iterable[GraphNode] = { graphNodes.filter(node => GraphNodeState.hasPredecessors(node.state)) } + + protected def allGraphNodesInStates(states: Set[GraphNodeState.Value]): Boolean = { + graphNodes.forall(n => states.contains(n.state)) + } + } diff --git a/core/src/main/scala/dagr/core/tasksystem/Pipeline.scala b/core/src/main/scala/dagr/core/tasksystem/Pipeline.scala index 910616a8..d2b76adf 100644 --- a/core/src/main/scala/dagr/core/tasksystem/Pipeline.scala +++ b/core/src/main/scala/dagr/core/tasksystem/Pipeline.scala @@ -28,6 +28,8 @@ import java.nio.file.Path import com.fulcrumgenomics.commons.io.Io import com.fulcrumgenomics.commons.util.LazyLogging +import scala.collection.mutable + /** Simple trait to track tasks within a pipeline */ abstract class Pipeline(val outputDirectory: Option[Path] = None, private var prefix: Option[String] = None, @@ -78,8 +80,21 @@ abstract class Pipeline(val outputDirectory: Option[Path] = None, /** Recursively navigates dependencies, starting from the supplied task, and add all children to this.tasks. */ private def addChildren(task : Task) : Unit = { - tasks ++= task.tasksDependingOnThisTask - task.tasksDependingOnThisTask.foreach(addChildren) + // Developer note: we may have very deep dependency graphs, so this implementation avoids stack overflows + // Developer note: we use a Set here so that we do not recurse on the same task twice. + // Suppose we have `A ==> (B :: C)` and `B ==> C`. Even thought this could be simplified to `A ==> B ==> C`, that's + // up to the caller, and we post-processing of the DAG. So when `addChildren` gets called on `A`, it recurses on + // `B` and `C`. Since `C` depends on `C`, without the uniqueness check we recurse on `C` in the `addChildren` + // call on `B`. + val toVisit: mutable.Set[Task] = mutable.HashSet[Task](task) + while (toVisit.nonEmpty) { + val nextTask: Task = toVisit.head + toVisit -= nextTask + nextTask.tasksDependingOnThisTask.filterNot(tasks.contains).foreach { child => + tasks += child + toVisit += child + } + } } /** True if we this pipeline is tracking this direct ancestor task, false otherwise. */ diff --git a/core/src/main/scala/dagr/core/tasksystem/Task.scala b/core/src/main/scala/dagr/core/tasksystem/Task.scala index 4f66f29a..e81f0892 100644 --- a/core/src/main/scala/dagr/core/tasksystem/Task.scala +++ b/core/src/main/scala/dagr/core/tasksystem/Task.scala @@ -26,6 +26,7 @@ package dagr.core.tasksystem import com.fulcrumgenomics.commons.CommonsDef.unreachable import dagr.core.execsystem.TaskExecutionInfo +import scala.annotation.tailrec import scala.collection.mutable import scala.collection.mutable.ListBuffer import scala.util.control.Breaks._ @@ -57,39 +58,40 @@ object Task { * @param task the task to begin search. * @return true if the DAG to which this task belongs has a cycle, false otherwise. */ - private[core] def hasCycle(task: Task): Boolean = { - findStronglyConnectedComponents(task).exists(component => isComponentACycle(component)) + private[core] def hasCycle(task: Task*): Boolean = { + findStronglyConnectedComponents(task:_*).exists(component => isComponentACycle(component)) } /** Finds all the strongly connected components of the graph to which this task is connected. * * Uses Tarjan's algorithm: https://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm * - * @param task a task in the graph to check. + * @param task one or more tasks in the graph to check. * @return the set of strongly connected components. */ - private[core] def findStronglyConnectedComponents(task: Task): Set[Set[Task]] = { + private[core] def findStronglyConnectedComponents(task: Task*): Iterator[Set[Task]] = { // 1. find all tasks connected to this task val visited: mutable.Set[Task] = new mutable.HashSet[Task]() - val toVisit: mutable.Set[Task] = mutable.HashSet[Task](task) + val toVisit: mutable.Set[Task] = mutable.HashSet[Task](task:_*) while (toVisit.nonEmpty) { val nextTask: Task = toVisit.head toVisit -= nextTask - (nextTask.tasksDependedOn.toList ::: nextTask.tasksDependingOnThisTask.toList).foreach(t => if (!visited.contains(t)) toVisit += t) + nextTask.tasksDependedOn.foreach(t => if (!visited.contains(t)) toVisit += t) + nextTask.tasksDependingOnThisTask.foreach(t => if (!visited.contains(t)) toVisit += t) visited += nextTask } // 2. Runs Tarjan's strongly connected components algorithm val data: TarjanData = new TarjanData - visited.filterNot(data.indexes.contains).foreach(v => findStronglyConnectedComponent(v, data)) + visited.iterator.filterNot(data.indexes.contains).foreach(v => findStronglyConnectedComponent(v, data)) // return all the components - data.components.map(component => component.toSet).toSet + data.components.iterator.filter(_.nonEmpty).map(component => component.toSet) } - /** Indicates if a given set of tasks that are strongly connected components contains a cycle. This is the + /** Indicates if a given set of tasks contain a cycle that is a strongly connected component. This is the * case if the set size is greater than one, or the task is depends on itself. See [[Task.findStronglyConnectedComponents()]] * for how to retrieve strongly connected components from a task. * @@ -101,8 +103,8 @@ object Task { else { component.headOption match { case Some(task) => - task.tasksDependedOn.toSet.contains(task) || - task.tasksDependingOnThisTask.toSet.contains(task) + task.tasksDependedOn.iterator.contains(task) || + task.tasksDependingOnThisTask.iterator.contains(task) case _ => false } } @@ -118,30 +120,35 @@ object Task { data.onStack += v // Consider successors of v - for(w <- v.tasksDependedOn) { // could alternatively use task.getTasksDependingOnThisTask + for (w <- v.tasksDependedOn) { // could alternatively use task.getTasksDependingOnThisTask if (!data.indexes.contains(w)) { // Successor w has not yet been visited; recurse on it findStronglyConnectedComponent(w, data) - data.lowLink.put(v, math.min(data.lowLink.get(v).get, data.lowLink.get(w).get)) + data.lowLink.put(v, math.min(data.lowLink(v), data.lowLink(w))) } else if (data.onStack(w)) { // Successor w is in stack S and hence in the current SCC - data.lowLink.put(v, math.min(data.lowLink.get(v).get, data.lowLink.get(w).get)) + data.lowLink.put(v, math.min(data.lowLink(v), data.lowLink(w))) } } // If v is a root node, pop the stack and generate an SCC - if (data.indexes.get(v).get == data.lowLink.get(v).get) { + if (data.indexes(v) == data.lowLink(v)) { val component: mutable.Set[Task] = new mutable.HashSet[Task]() - breakable { - while (data.stack.nonEmpty) { - val w: Task = data.stack.pop() - data.onStack -= w - component += w - if (w == v) break - } - } - data.components += component + buildComponent(data, v, component) + if (component.nonEmpty) data.components += component + } + } + + @tailrec + private def buildComponent(data: TarjanData, v: Task, component: mutable.Set[Task]): Unit = { + if (data.onStack.isEmpty) () + else { + val w: Task = data.stack.pop() + data.onStack -= w + component += w + if (w == v) () + else buildComponent(data, v, component) } } } @@ -190,9 +197,9 @@ trait Task extends Dependable { /** Removes this as a dependency for other */ override def !=>(other: Dependable): Unit = other.headTasks.foreach(_.removeDependency(this)) - override def headTasks: Iterable[Task] = Seq(this) - override def tailTasks: Iterable[Task] = Seq(this) - override def allTasks: Iterable[Task] = Seq(this) + override def headTasks: Iterable[Task] = Some(this) + override def tailTasks: Iterable[Task] = Some(this) + override def allTasks: Iterable[Task] = Some(this) /** * Removes a dependency by removing the supplied task from the list of dependencies for this task diff --git a/core/src/test/scala/dagr/core/execsystem/ResourceSetTest.scala b/core/src/test/scala/dagr/core/execsystem/ResourceSetTest.scala index 7ffe19d6..0bb42018 100644 --- a/core/src/test/scala/dagr/core/execsystem/ResourceSetTest.scala +++ b/core/src/test/scala/dagr/core/execsystem/ResourceSetTest.scala @@ -25,8 +25,9 @@ package dagr.core.execsystem import dagr.core.UnitSpec +import org.scalatest.OptionValues -class ResourceSetTest extends UnitSpec { +class ResourceSetTest extends UnitSpec with OptionValues { "ResourceSet.isEmpty" should "return true for the empty resource set" in { ResourceSet.empty.isEmpty shouldBe true } @@ -43,4 +44,30 @@ class ResourceSetTest extends UnitSpec { running = running - Cores(10) running.cores.value shouldBe 0 } + + it should "subset resources" in { + // doc examples + ResourceSet(4.5, 10).subset(Cores(1), Cores(5), Memory(10)).value shouldBe ResourceSet(4, 10) + ResourceSet(4.5, 10).subset(Cores(1), Cores(5.1), Memory(10)).value shouldBe ResourceSet(4.5, 10) + ResourceSet(1.5, 10).subset(Cores(1), Cores(5), Memory(10)).value shouldBe ResourceSet(1, 10) + ResourceSet(1.5, 10).subset(Cores(1.5), Cores(5), Memory(10)).value shouldBe ResourceSet(1.5, 10) + + val resources = ResourceSet(10, 10) + resources.subset(ResourceSet(10, 10)).value shouldBe ResourceSet(10, 10) + resources.subset(ResourceSet(10.5, 10)).isDefined shouldBe false + resources.subset(ResourceSet(9.5, 10)).value shouldBe ResourceSet(9.5, 10) + resources.subset(ResourceSet(5, 5)).value shouldBe ResourceSet(5, 5) + resources.subset(ResourceSet(4.5, 5)).value shouldBe ResourceSet(4.5, 5) + + val halfACore = ResourceSet(0.5, 10) + halfACore.subset(ResourceSet(0.5, 10)).value shouldBe ResourceSet(0.5, 10) + halfACore.subset(Cores(0.25), Cores(0.5), Memory(10)).value shouldBe ResourceSet(0.5, 10) + halfACore.subset(Cores(0.5), Cores(1), Memory(10)).value shouldBe ResourceSet(0.5, 10) + halfACore.subset(Cores(0.1), Cores(1), Memory(10)).value shouldBe ResourceSet(0.1, 10) + halfACore.subset(Cores(0.51), Cores(1), Memory(10)).isDefined shouldBe false + halfACore.subset(Cores(0.25), Cores(0.3), Memory(10)).value shouldBe ResourceSet(0.3, 10) + + val fiveAndAHalfCores = ResourceSet(5.5, 10) + fiveAndAHalfCores.subset(Cores(1.2), Cores(5.8), Memory(10)).value shouldBe ResourceSet(5.5, 10) + } } diff --git a/core/src/test/scala/dagr/core/execsystem/TaskManagerTest.scala b/core/src/test/scala/dagr/core/execsystem/TaskManagerTest.scala index bae6c9c7..9b4c5419 100644 --- a/core/src/test/scala/dagr/core/execsystem/TaskManagerTest.scala +++ b/core/src/test/scala/dagr/core/execsystem/TaskManagerTest.scala @@ -56,10 +56,9 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B override def beforeAll(): Unit = Logger.level = LogLevel.Fatal override def afterAll(): Unit = Logger.level = LogLevel.Info - def getDefaultTaskManager(sleepMilliseconds: Int = 10): TestTaskManager = new TaskManager( + def getDefaultTaskManager(): TestTaskManager = new TaskManager( taskManagerResources = SystemResources.infinite, - scriptsDirectory = None, - sleepMilliseconds = sleepMilliseconds + scriptsDirectory = None ) with TestTaskManager private def runSchedulerOnce(taskManager: TestTaskManager, @@ -97,8 +96,8 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B "TaskManager" should "not overwrite an existing task when adding a task, or throw an IllegalArgumentException when ignoreExists is false" in { val task: UnitTask = new ShellCommand("exit", "0") withName "exit 0" requires ResourceSet.empty val taskManager: TestTaskManager = getDefaultTaskManager() - taskManager.addTasks(tasks=Seq(task, task), ignoreExists=true) shouldBe List(0, 0) - an[IllegalArgumentException] should be thrownBy taskManager.addTask(task=task, enclosingNode=None, ignoreExists=false) + taskManager.addTasks(tasks=Seq(task, task), enclosingNode=None, ignoreExists=true) shouldBe List(0, 0) + an[IllegalArgumentException] should be thrownBy taskManager.addTasks(tasks=Seq(task), enclosingNode=None, ignoreExists=false) } it should "get the task status for only tracked tasks" in { @@ -190,7 +189,6 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B def runSimpleEndToEnd(task: UnitTask = new ShellCommand("exit", "0") withName "exit 0", simulate: Boolean): Unit = { val map: BiMap[Task, TaskExecutionInfo] = TaskManager.run( task = task, - sleepMilliseconds = 10, taskManagerResources = Some(SystemResources.infinite), scriptsDirectory = None, simulate = simulate, @@ -222,7 +220,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B val longTask: UnitTask = new ShellCommand("sleep", "1000") withName "sleep 1000" val failedTask: UnitTask = new ShellCommand("exit", "1") withName "exit 1" - val taskManager: TestTaskManager = getDefaultTaskManager(sleepMilliseconds=1) + val taskManager: TestTaskManager = getDefaultTaskManager() taskManager.addTasks(longTask, failedTask) taskManager.runToCompletion(failFast=true) taskManager.taskStatusFor(failedTask).value should be(TaskStatus.FAILED_COMMAND) @@ -233,7 +231,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B it should "not schedule and run tasks that have failed dependencies" in { val List(a, b, c) = List(0,1,0).map(c => new ShellCommand("exit", c.toString)) a ==> b ==> c - val tm = getDefaultTaskManager(sleepMilliseconds=1) + val tm = getDefaultTaskManager() tm.addTasks(a, b, c) tm.runToCompletion(failFast=false) tm.taskStatusFor(a).value shouldBe TaskStatus.SUCCEEDED @@ -247,7 +245,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B it should "not schedule and run tasks that have failed dependencies and complete all when failed tasks are manually succeeded" in { val List(a, b, c) = List(0,1,0).map(c => new ShellCommand("exit", c.toString)) a ==> b ==> c - val tm = getDefaultTaskManager(sleepMilliseconds=1) + val tm = getDefaultTaskManager() tm.addTasks(a, b, c) tm.runToCompletion(failFast=false) tm.taskStatusFor(a).value shouldBe TaskStatus.SUCCEEDED @@ -882,7 +880,6 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B TaskManager.run( new HungryPipeline, - sleepMilliseconds = 1, taskManagerResources = Some(SystemResources(systemCores, Resource.parseSizeToBytes("8g").toLong, 0.toLong)), failFast=true ) @@ -915,7 +912,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B } // add the tasks to the task manager - val taskManager: TestTaskManager = getDefaultTaskManager(sleepMilliseconds = 1) + val taskManager: TestTaskManager = getDefaultTaskManager() taskManager.addTasks(tasks) // run the tasks @@ -948,7 +945,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B } // add the tasks to the task manager - val taskManager: TestTaskManager = getDefaultTaskManager(sleepMilliseconds = 1) + val taskManager: TestTaskManager = getDefaultTaskManager() taskManager.addTasks(pipeline) // run the tasks @@ -987,7 +984,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B // NB: the execution is really: root ==> firstTask ==> secondTask // add the tasks to the task manager - val taskManager: TestTaskManager = getDefaultTaskManager(sleepMilliseconds = 1) + val taskManager: TestTaskManager = getDefaultTaskManager() taskManager.addTasks(outerPipeline) // run the tasks @@ -1022,7 +1019,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B it should "mark a task as failed when one of its children fails" in { val parent = new ParentFailTask() - val taskManager: TestTaskManager = getDefaultTaskManager(sleepMilliseconds = 1) + val taskManager: TestTaskManager = getDefaultTaskManager() taskManager.addTask(parent) taskManager.runToCompletion(failFast=true) Seq(parent.child, parent).foreach { task => @@ -1037,7 +1034,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B it should "mark a pipeline as failed when one of its children fails" in { val pipeline = new FailPipeline() - val taskManager: TestTaskManager = getDefaultTaskManager(sleepMilliseconds = 1) + val taskManager: TestTaskManager = getDefaultTaskManager() taskManager.addTask(pipeline) taskManager.runToCompletion(failFast=true) Seq(pipeline.child, pipeline).foreach { task => @@ -1055,7 +1052,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B tasks += pipeline pipeline } - val taskManager: TestTaskManager = getDefaultTaskManager(sleepMilliseconds = 1) + val taskManager: TestTaskManager = getDefaultTaskManager() taskManager.addTask(root) taskManager.runToCompletion(failFast=true) tasks.foreach { task => diff --git a/core/src/test/scala/dagr/core/execsystem/TopLikeStatusReporterTest.scala b/core/src/test/scala/dagr/core/execsystem/TopLikeStatusReporterTest.scala index 87564e80..5f7fd61e 100644 --- a/core/src/test/scala/dagr/core/execsystem/TopLikeStatusReporterTest.scala +++ b/core/src/test/scala/dagr/core/execsystem/TopLikeStatusReporterTest.scala @@ -57,10 +57,9 @@ class TopLikeStatusReporterTest extends UnitSpec with CaptureSystemStreams with } } - private def getDefaultTaskManager(sleepMilliseconds: Int = 10): TaskManager = new TaskManager( + private def getDefaultTaskManager(): TaskManager = new TaskManager( taskManagerResources = SystemResources.infinite, - scriptsDirectory = None, - sleepMilliseconds = sleepMilliseconds + scriptsDirectory = None ) "Terminal" should "support ANSI codes" in { @@ -156,8 +155,7 @@ class TopLikeStatusReporterTest extends UnitSpec with CaptureSystemStreams with val printMethod: String => Unit = (str: String) => output.append(str) val taskManager = new TaskManager( taskManagerResources = SystemResources(1.0, Long.MaxValue, Long.MaxValue), // one task at a time - scriptsDirectory = None, - sleepMilliseconds = 10 + scriptsDirectory = None ) val reporter = new TopLikeStatusReporter(taskManager = taskManager, print = printMethod) with TestTerminal diff --git a/core/src/test/scala/dagr/core/tasksystem/DependableTest.scala b/core/src/test/scala/dagr/core/tasksystem/DependableTest.scala index 69833e91..4988126b 100644 --- a/core/src/test/scala/dagr/core/tasksystem/DependableTest.scala +++ b/core/src/test/scala/dagr/core/tasksystem/DependableTest.scala @@ -125,13 +125,13 @@ class DependableTest extends UnitSpec { k.tasksDependedOn should contain theSameElementsAs Seq(i) } - "Pipeline.root" should "return the same things as Pipeline from the *tasks methods" in { + "Pipeline.root" should "return the same things as Pipeline from the *tasks methods*" in { val pipeline = new Pipeline() { override def build() = root ==> (A :: B :: C) ==> (X :: Y :: Z) } - pipeline.root.headTasks shouldBe pipeline.headTasks - pipeline.root.tailTasks shouldBe pipeline.tailTasks - pipeline.root.allTasks shouldBe pipeline.allTasks + pipeline.root.headTasks.toList should contain theSameElementsInOrderAs pipeline.headTasks + pipeline.root.tailTasks.toList should contain theSameElementsInOrderAs pipeline.tailTasks + pipeline.root.allTasks.toList should contain theSameElementsInOrderAs pipeline.allTasks } } diff --git a/tasks/src/test/scala/dagr/tasks/ScatterGatherTests.scala b/tasks/src/test/scala/dagr/tasks/ScatterGatherTests.scala index a74bceb9..c83798bf 100644 --- a/tasks/src/test/scala/dagr/tasks/ScatterGatherTests.scala +++ b/tasks/src/test/scala/dagr/tasks/ScatterGatherTests.scala @@ -46,7 +46,7 @@ import org.scalatest.BeforeAndAfterAll class ScatterGatherTests extends UnitSpec with LazyLogging with BeforeAndAfterAll { override def beforeAll(): Unit = Logger.level = LogLevel.Fatal override def afterAll(): Unit = Logger.level = LogLevel.Info - def buildTaskManager: TaskManager = new TaskManager(taskManagerResources = SystemResources.infinite, scriptsDirectory = None, sleepMilliseconds=1) + def buildTaskManager: TaskManager = new TaskManager(taskManagerResources = SystemResources.infinite, scriptsDirectory = None) def tmp(prefix: Option[String] = None): Path = { val path = Files.createTempFile(prefix.getOrElse("testScatterGather."), ".txt")