diff --git a/core/src/main/scala/dagr/core/execsystem/TaskManager.scala b/core/src/main/scala/dagr/core/execsystem/TaskManager.scala index f112bd63..d3046884 100644 --- a/core/src/main/scala/dagr/core/execsystem/TaskManager.scala +++ b/core/src/main/scala/dagr/core/execsystem/TaskManager.scala @@ -301,7 +301,7 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de if (updateNodeToCompleted) { logger.debug("processCompletedTask: Task [" + taskInfo.task.name + "] updating node to completed") if (TaskStatus.isTaskNotDone(taskInfo.status, failedIsDone = true)) { - throw new RuntimeException("Processing a completed task but it was not done!") + throw new RuntimeException(s"Processing a completed task but it was not done! status: ${taskInfo.status}") } completeGraphNode(node, Some(taskInfo)) } @@ -313,10 +313,17 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de */ private def updateCompletedTasks(): Map[TaskId, (Int, Boolean)] = { val completedTasks: Map[TaskId, (Int, Boolean)] = taskExecutionRunner.completedTasks() - completedTasks.keys.foreach(taskId => processCompletedTask(taskId)) - logger.debug("updateCompletedTasks: found " + completedTasks.size + " completed tasks") - for (taskId <- completedTasks.keys) { - val name = this(taskId).task.name + val emptyTasks = graphNodesInStateFor(GraphNodeState.NO_PREDECESSORS).filter(_.task.isInstanceOf[Task.EmptyTask]).toSeq + val completedTaskIds = completedTasks.keys ++ emptyTasks.map(_.taskId) + + emptyTasks.foreach { node => + node.taskInfo.status = TaskStatus.SUCCEEDED + logger.debug("updateCompletedTasks: empty task [" + node.task.name + "] completed") + } + + completedTaskIds.foreach { taskId => + processCompletedTask(taskId) + val name = this(taskId).task.name val status = this(taskId).taskInfo.status logger.debug("updateCompletedTasks: task [" + name + "] completed with task status [" + status + "]") } @@ -358,6 +365,7 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de logger.debug("updatePredecessors: examining task [" + node.task.name + "] for predecessors: " + node.hasPredecessor) // - if this node has already been expanded and now has no predecessors, then move it to the next state. // - if it hasn't been expanded and now has no predecessors, it should get expanded later + if (!node.hasPredecessor) logger.debug(s"updatePredecessors: has node state: ${node.state}") if (!node.hasPredecessor && node.state == ONLY_PREDECESSORS) { val taskInfo = node.taskInfo node.task match { @@ -390,15 +398,18 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de private def invokeGetTasks(node: GraphNode): Boolean = { // get the list of tasks that this task generates - val tasks = try { node.task.getTasks.toList } catch { case e: Exception => throw new TaskException(e, TaskStatus.FAILED_GET_TASKS) } - logger.debug(f"0 ${node.task.name} : ${tasks.map(_.name).mkString(", ")}") + val tasks = try { node.task.getTasks.toSeq } catch { case e: Exception => throw new TaskException(e, TaskStatus.FAILED_GET_TASKS) } // NB: we don't create a new node for this task if it just returns itself // NB: always check for cycles, since we don't know when they could be introduced. We will check // for cycles in [[addTask]] so only check here if [[getTasks]] returns itself. tasks match { case Nil => // no tasks returned - throw new IllegalStateException(s"No tasks to schedule for task: [${node.task.name}]") + logger.debug(f"invokeGetTasks 1 ${node.task.name} : ${tasks.map(_.name).mkString(", ")}") + // set the submission time stamp + node.taskInfo.submissionDate = Some(Instant.now()) + false case x :: Nil if x == node.task => // one task and it returned itself + logger.debug(f"invokeGetTasks 2 ${node.task.name} : ${tasks.map(_.name).mkString(", ")}") // check for cycles only when we have a unit task for which calling [[getTasks] returns itself. checkForCycles(task = node.task) // verify we have a UnitTask @@ -408,13 +419,13 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de } false case _ => - logger.debug(f"3 ${node.task.name} : ${tasks.map(_.name).mkString(", ")}") + logger.debug(f"invokeGetTasks 3 ${node.task.name} : ${tasks.map(_.name).mkString(", ")}") // set the submission time stamp node.taskInfo.submissionDate = Some(Instant.now()) // we will make this task dependent on the tasks it creates... if (tasks.contains(node.task)) throw new IllegalStateException(s"Task [${node.task.name}] contained itself in the list returned by getTasks") // track the new tasks. If they are already added, that's fine too. - val taskIds: List[TaskId] = tasks.map { task => addTask(task = task, enclosingNode = Some(node), ignoreExists = true) } + val taskIds: Seq[TaskId] = tasks.map { task => addTask(task = task, enclosingNode = Some(node), ignoreExists = true) } // make this node dependent on those tasks taskIds.map(taskId => node.addPredecessors(this(taskId))) // we may need to update precedessors if a returned task was already completed @@ -442,11 +453,9 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de node.state = GraphNodeState.ONLY_PREDECESSORS taskInfo.status = TaskStatus.STARTED } - else { // must have predecessors, since `getTasks` did not return itself, and therefore we made this task dependent on others. - throw new IllegalStateException( - "Updating a non-UnitTask's state and status, but could not find any predecessors. " + - "Were tasks returned by its get tasks? " + - s"Task: [${node.task.name}]") + else { // if `getTasks` returned no tasks, then just update it to succeeded + node.state = NO_PREDECESSORS + taskInfo.status = TaskStatus.STARTED } } } @@ -512,12 +521,14 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de protected[core] def readyTasksList: List[UnitTask] = graphNodesInStateFor(NO_PREDECESSORS).toList.map(node => node.task.asInstanceOf[UnitTask]) override def stepExecution(): (Iterable[Task], Iterable[Task], Iterable[Task], Iterable[Task]) = { - logger.debug("runSchedulerOnce: starting one round of execution") + logger.debug("stepExecution: starting one round of execution") // get newly completed tasks val completedTasks = updateCompletedTasks() val canDoAnything = completedTasks.nonEmpty || taskExecutionRunner.runningTaskIds.isEmpty + logger.debug(s"stepExecution: canDoAnything=$canDoAnything") + if (canDoAnything) { // check if we now know about the predecessors for orphan tasks. updateOrphans() @@ -533,20 +544,22 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de val runningTasks: Map[UnitTask, ResourceSet] = runningTasksMap // get the tasks that are eligible for execution (tasks with no dependents) - val readyTasks: List[UnitTask] = graphNodesInStateFor(NO_PREDECESSORS).toList.map(node => node.task.asInstanceOf[UnitTask]) - logger.debug("runSchedulerOnce: found " + readyTasks.size + " readyTasks tasks") + val (emptyTasks: List[Task], readyTasks: List[Task]) = { + graphNodesInStateFor(NO_PREDECESSORS).map(_.task).toList.partition(_.isInstanceOf[Task.EmptyTask]) + } + logger.debug(s"stepExecution: found ${readyTasks.size} readyTasks tasks and ${emptyTasks.size} empty tasks") // get the list of tasks to schedule val tasksToSchedule: Map[UnitTask, ResourceSet] = if (!canDoAnything) Map.empty else { val tasks = scheduler.schedule( runningTasks = runningTasks, - readyTasks = readyTasks, + readyTasks = readyTasks.filter(_.isInstanceOf[UnitTask]).map(_.asInstanceOf[UnitTask]), systemCores = taskManagerResources.cores, systemMemory = taskManagerResources.systemMemory, jvmMemory = taskManagerResources.jvmMemory ) - logger.debug("runSchedulerOnce: scheduling " + tasks.size + " tasks") + logger.debug("stepExecution: scheduling " + tasks.size + " tasks") // add the given tasks to the task runner with the appropriate (as determined by the scheduler) resources. scheduleAndRunTasks(tasksToSchedule = tasks) @@ -555,13 +568,13 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de } // for debugging purposes - logger.debug("runSchedulerOnce: finishing one round of execution") - logger.debug("runSchedulerOnce: found " + runningTasks.size + " running tasks and " + tasksToSchedule.size + " tasks to schedule") + logger.debug("stepExecution: finishing one round of execution") + logger.debug("stepExecution: found " + runningTasks.size + " running tasks and " + tasksToSchedule.size + " tasks to schedule") ( readyTasks, tasksToSchedule.keys, - runningTasks.keys, + runningTasks.keys ++ emptyTasks, completedTasks.keys.map(taskId => this(taskId).task) ) } diff --git a/core/src/main/scala/dagr/core/tasksystem/Pipeline.scala b/core/src/main/scala/dagr/core/tasksystem/Pipeline.scala index 19033df1..910616a8 100644 --- a/core/src/main/scala/dagr/core/tasksystem/Pipeline.scala +++ b/core/src/main/scala/dagr/core/tasksystem/Pipeline.scala @@ -84,4 +84,7 @@ abstract class Pipeline(val outputDirectory: Option[Path] = None, /** True if we this pipeline is tracking this direct ancestor task, false otherwise. */ def contains(task: Task): Boolean = tasks.contains(task) + + /** Builds an empty task for use within this pipeline. */ + def emptyTask: Task = Task.empty } diff --git a/core/src/main/scala/dagr/core/tasksystem/Task.scala b/core/src/main/scala/dagr/core/tasksystem/Task.scala index 459158e9..38402212 100644 --- a/core/src/main/scala/dagr/core/tasksystem/Task.scala +++ b/core/src/main/scala/dagr/core/tasksystem/Task.scala @@ -33,6 +33,15 @@ import scala.util.control.Breaks._ /** Utility methods to aid in working with a task. */ object Task { + /** Marker trait for empty tasks. */ + sealed trait EmptyTask extends Task + + /** A task that does nothing. */ + def empty: Task = new EmptyTask { + name = "Task.empty" + override def getTasks: Iterable[_ <: Task] = None + } + /** Helper class for Tarjan's strongly connected components algorithm */ private class TarjanData { var index: Int = 0 diff --git a/core/src/test/scala/dagr/core/cmdline/DagrCoreMainTest.scala b/core/src/test/scala/dagr/core/cmdline/DagrCoreMainTest.scala index 631ff30d..b6a564a5 100644 --- a/core/src/test/scala/dagr/core/cmdline/DagrCoreMainTest.scala +++ b/core/src/test/scala/dagr/core/cmdline/DagrCoreMainTest.scala @@ -24,7 +24,7 @@ package dagr.core.cmdline -import dagr.core.cmdline.pipelines.PipelineFour +import dagr.core.cmdline.pipelines.{PipelineFour, PipelineBuildFailure} import dagr.core.tasksystem.{NoOpInJvmTask, Pipeline} import com.fulcrumgenomics.commons.io.Io import com.fulcrumgenomics.sopt.util.TermCode @@ -119,7 +119,7 @@ class DagrCoreMainTest extends UnitSpec with BeforeAndAfterAll with CaptureSyste } it should "print the execution failure upon failure" in { - val (_, _, _, log) = testParse(Array[String](nameOf(classOf[PipelineFour]))) + val (_, _, _, log) = testParse(Array[String](nameOf(classOf[PipelineBuildFailure]))) log should include("Elapsed time:") log should include("dagr failed") } diff --git a/core/src/test/scala/dagr/core/cmdline/pipelines/Pipelines.scala b/core/src/test/scala/dagr/core/cmdline/pipelines/Pipelines.scala index 1531827e..a5896f3f 100644 --- a/core/src/test/scala/dagr/core/cmdline/pipelines/Pipelines.scala +++ b/core/src/test/scala/dagr/core/cmdline/pipelines/Pipelines.scala @@ -57,3 +57,9 @@ private[cmdline] case class PipelineFour @clp(description = "", group = classOf[TestGroup], hidden = true) private[cmdline] case class PipelineWithMutex (@arg(mutex = Array("another")) var argument: String, @arg(mutex = Array("argument")) var another: String) extends CommandLineTaskTesting // argument should be required + +@clp(description = "", group = classOf[TestGroup], hidden = true) +private[cmdline] case class PipelineBuildFailure +(@arg var argument: String = "default", @arg var flag: Boolean = false) extends CommandLineTaskTesting { + override def build(): Unit = throw new IllegalStateException() +} diff --git a/core/src/test/scala/dagr/core/execsystem/TaskManagerTest.scala b/core/src/test/scala/dagr/core/execsystem/TaskManagerTest.scala index f8acab17..bae6c9c7 100644 --- a/core/src/test/scala/dagr/core/execsystem/TaskManagerTest.scala +++ b/core/src/test/scala/dagr/core/execsystem/TaskManagerTest.scala @@ -128,10 +128,39 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B tryTaskNTimes(taskManager = taskManager, task = task, numTimes = 1, taskIsDoneFinally = true, failedAreCompletedFinally = true) } + it should "run an empty task" in { + val task = Task.empty + val taskManager: TestTaskManager = getDefaultTaskManager() + taskManager.addTask(task) + val taskMap = taskManager.runToCompletion(true) + taskMap.size shouldBe 1 + val taskInfo = taskMap.valueFor(task).get + TaskStatus.isTaskDone(taskInfo.status, failedIsDone = false) shouldBe true + } + + it should "run an empty task as part of a pipeline" in { + val pipeline = new Pipeline() { + name = "Pipeline" + override def build(): Unit = { + def newTask(name: String) = new ShellCommand("exit", "0") withName "exit 0" requires ResourceSet.empty withName name + val middle = Task.empty + root ==> newTask("exit0-1") ==> middle ==> newTask("exit0-2") + } + } + + val taskManager: TestTaskManager = getDefaultTaskManager() + taskManager.addTask(pipeline) + val taskMap = taskManager.runToCompletion(true) + taskMap.size shouldBe 4 + taskMap.foreach { case (_, info) => + TaskStatus.isTaskDone(info.status, failedIsDone = false) shouldBe true + } + } + + it should "run a simple task that fails but we allow it" in { val task: UnitTask = new ShellCommand("exit", "1") withName "exit 1" val taskManager: TestTaskManager = getDefaultTaskManager() - tryTaskNTimes(taskManager = taskManager, task = task, numTimes = 1, taskIsDoneFinally = true, failedAreCompletedFinally = true) }