Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Add support for empty tasks #360

Merged
merged 1 commit into from
Aug 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 36 additions & 23 deletions core/src/main/scala/dagr/core/execsystem/TaskManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
if (updateNodeToCompleted) {
logger.debug("processCompletedTask: Task [" + taskInfo.task.name + "] updating node to completed")
if (TaskStatus.isTaskNotDone(taskInfo.status, failedIsDone = true)) {
throw new RuntimeException("Processing a completed task but it was not done!")
throw new RuntimeException(s"Processing a completed task but it was not done! status: ${taskInfo.status}")
}
completeGraphNode(node, Some(taskInfo))
}
Expand All @@ -313,10 +313,17 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
*/
private def updateCompletedTasks(): Map[TaskId, (Int, Boolean)] = {
val completedTasks: Map[TaskId, (Int, Boolean)] = taskExecutionRunner.completedTasks()
completedTasks.keys.foreach(taskId => processCompletedTask(taskId))
logger.debug("updateCompletedTasks: found " + completedTasks.size + " completed tasks")
for (taskId <- completedTasks.keys) {
val name = this(taskId).task.name
val emptyTasks = graphNodesInStateFor(GraphNodeState.NO_PREDECESSORS).filter(_.task.isInstanceOf[Task.EmptyTask]).toSeq
val completedTaskIds = completedTasks.keys ++ emptyTasks.map(_.taskId)

emptyTasks.foreach { node =>
node.taskInfo.status = TaskStatus.SUCCEEDED
logger.debug("updateCompletedTasks: empty task [" + node.task.name + "] completed")
}

completedTaskIds.foreach { taskId =>
processCompletedTask(taskId)
val name = this(taskId).task.name
val status = this(taskId).taskInfo.status
logger.debug("updateCompletedTasks: task [" + name + "] completed with task status [" + status + "]")
}
Expand Down Expand Up @@ -358,6 +365,7 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
logger.debug("updatePredecessors: examining task [" + node.task.name + "] for predecessors: " + node.hasPredecessor)
// - if this node has already been expanded and now has no predecessors, then move it to the next state.
// - if it hasn't been expanded and now has no predecessors, it should get expanded later
if (!node.hasPredecessor) logger.debug(s"updatePredecessors: has node state: ${node.state}")
if (!node.hasPredecessor && node.state == ONLY_PREDECESSORS) {
val taskInfo = node.taskInfo
node.task match {
Expand Down Expand Up @@ -390,15 +398,18 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
private def invokeGetTasks(node: GraphNode): Boolean = {

// get the list of tasks that this task generates
val tasks = try { node.task.getTasks.toList } catch { case e: Exception => throw new TaskException(e, TaskStatus.FAILED_GET_TASKS) }
logger.debug(f"0 ${node.task.name} : ${tasks.map(_.name).mkString(", ")}")
val tasks = try { node.task.getTasks.toSeq } catch { case e: Exception => throw new TaskException(e, TaskStatus.FAILED_GET_TASKS) }
// NB: we don't create a new node for this task if it just returns itself
// NB: always check for cycles, since we don't know when they could be introduced. We will check
// for cycles in [[addTask]] so only check here if [[getTasks]] returns itself.
tasks match {
case Nil => // no tasks returned
throw new IllegalStateException(s"No tasks to schedule for task: [${node.task.name}]")
logger.debug(f"invokeGetTasks 1 ${node.task.name} : ${tasks.map(_.name).mkString(", ")}")
// set the submission time stamp
node.taskInfo.submissionDate = Some(Instant.now())
false
case x :: Nil if x == node.task => // one task and it returned itself
logger.debug(f"invokeGetTasks 2 ${node.task.name} : ${tasks.map(_.name).mkString(", ")}")
// check for cycles only when we have a unit task for which calling [[getTasks] returns itself.
checkForCycles(task = node.task)
// verify we have a UnitTask
Expand All @@ -408,13 +419,13 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
}
false
case _ =>
logger.debug(f"3 ${node.task.name} : ${tasks.map(_.name).mkString(", ")}")
logger.debug(f"invokeGetTasks 3 ${node.task.name} : ${tasks.map(_.name).mkString(", ")}")
// set the submission time stamp
node.taskInfo.submissionDate = Some(Instant.now())
// we will make this task dependent on the tasks it creates...
if (tasks.contains(node.task)) throw new IllegalStateException(s"Task [${node.task.name}] contained itself in the list returned by getTasks")
// track the new tasks. If they are already added, that's fine too.
val taskIds: List[TaskId] = tasks.map { task => addTask(task = task, enclosingNode = Some(node), ignoreExists = true) }
val taskIds: Seq[TaskId] = tasks.map { task => addTask(task = task, enclosingNode = Some(node), ignoreExists = true) }
// make this node dependent on those tasks
taskIds.map(taskId => node.addPredecessors(this(taskId)))
// we may need to update precedessors if a returned task was already completed
Expand Down Expand Up @@ -442,11 +453,9 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
node.state = GraphNodeState.ONLY_PREDECESSORS
taskInfo.status = TaskStatus.STARTED
}
else { // must have predecessors, since `getTasks` did not return itself, and therefore we made this task dependent on others.
throw new IllegalStateException(
"Updating a non-UnitTask's state and status, but could not find any predecessors. " +
"Were tasks returned by its get tasks? " +
s"Task: [${node.task.name}]")
else { // if `getTasks` returned no tasks, then just update it to succeeded
node.state = NO_PREDECESSORS
taskInfo.status = TaskStatus.STARTED
}
}
}
Expand Down Expand Up @@ -512,12 +521,14 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
protected[core] def readyTasksList: List[UnitTask] = graphNodesInStateFor(NO_PREDECESSORS).toList.map(node => node.task.asInstanceOf[UnitTask])

override def stepExecution(): (Iterable[Task], Iterable[Task], Iterable[Task], Iterable[Task]) = {
logger.debug("runSchedulerOnce: starting one round of execution")
logger.debug("stepExecution: starting one round of execution")

// get newly completed tasks
val completedTasks = updateCompletedTasks()
val canDoAnything = completedTasks.nonEmpty || taskExecutionRunner.runningTaskIds.isEmpty

logger.debug(s"stepExecution: canDoAnything=$canDoAnything")

if (canDoAnything) {
// check if we now know about the predecessors for orphan tasks.
updateOrphans()
Expand All @@ -533,20 +544,22 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
val runningTasks: Map[UnitTask, ResourceSet] = runningTasksMap

// get the tasks that are eligible for execution (tasks with no dependents)
val readyTasks: List[UnitTask] = graphNodesInStateFor(NO_PREDECESSORS).toList.map(node => node.task.asInstanceOf[UnitTask])
logger.debug("runSchedulerOnce: found " + readyTasks.size + " readyTasks tasks")
val (emptyTasks: List[Task], readyTasks: List[Task]) = {
graphNodesInStateFor(NO_PREDECESSORS).map(_.task).toList.partition(_.isInstanceOf[Task.EmptyTask])
}
logger.debug(s"stepExecution: found ${readyTasks.size} readyTasks tasks and ${emptyTasks.size} empty tasks")

// get the list of tasks to schedule
val tasksToSchedule: Map[UnitTask, ResourceSet] = if (!canDoAnything) Map.empty else {
val tasks = scheduler.schedule(
runningTasks = runningTasks,
readyTasks = readyTasks,
readyTasks = readyTasks.filter(_.isInstanceOf[UnitTask]).map(_.asInstanceOf[UnitTask]),
systemCores = taskManagerResources.cores,
systemMemory = taskManagerResources.systemMemory,
jvmMemory = taskManagerResources.jvmMemory
)

logger.debug("runSchedulerOnce: scheduling " + tasks.size + " tasks")
logger.debug("stepExecution: scheduling " + tasks.size + " tasks")

// add the given tasks to the task runner with the appropriate (as determined by the scheduler) resources.
scheduleAndRunTasks(tasksToSchedule = tasks)
Expand All @@ -555,13 +568,13 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
}

// for debugging purposes
logger.debug("runSchedulerOnce: finishing one round of execution")
logger.debug("runSchedulerOnce: found " + runningTasks.size + " running tasks and " + tasksToSchedule.size + " tasks to schedule")
logger.debug("stepExecution: finishing one round of execution")
logger.debug("stepExecution: found " + runningTasks.size + " running tasks and " + tasksToSchedule.size + " tasks to schedule")

(
readyTasks,
tasksToSchedule.keys,
runningTasks.keys,
runningTasks.keys ++ emptyTasks,
completedTasks.keys.map(taskId => this(taskId).task)
)
}
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/dagr/core/tasksystem/Pipeline.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,7 @@ abstract class Pipeline(val outputDirectory: Option[Path] = None,

/** True if we this pipeline is tracking this direct ancestor task, false otherwise. */
def contains(task: Task): Boolean = tasks.contains(task)

/** Builds an empty task for use within this pipeline. */
def emptyTask: Task = Task.empty
}
9 changes: 9 additions & 0 deletions core/src/main/scala/dagr/core/tasksystem/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ import scala.util.control.Breaks._
/** Utility methods to aid in working with a task. */
object Task {

/** Marker trait for empty tasks. */
sealed trait EmptyTask extends Task

/** A task that does nothing. */
def empty: Task = new EmptyTask {
name = "Task.empty"
override def getTasks: Iterable[_ <: Task] = None
}

/** Helper class for Tarjan's strongly connected components algorithm */
private class TarjanData {
var index: Int = 0
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/dagr/core/cmdline/DagrCoreMainTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

package dagr.core.cmdline

import dagr.core.cmdline.pipelines.PipelineFour
import dagr.core.cmdline.pipelines.{PipelineFour, PipelineBuildFailure}
import dagr.core.tasksystem.{NoOpInJvmTask, Pipeline}
import com.fulcrumgenomics.commons.io.Io
import com.fulcrumgenomics.sopt.util.TermCode
Expand Down Expand Up @@ -119,7 +119,7 @@ class DagrCoreMainTest extends UnitSpec with BeforeAndAfterAll with CaptureSyste
}

it should "print the execution failure upon failure" in {
val (_, _, _, log) = testParse(Array[String](nameOf(classOf[PipelineFour])))
val (_, _, _, log) = testParse(Array[String](nameOf(classOf[PipelineBuildFailure])))
log should include("Elapsed time:")
log should include("dagr failed")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,9 @@ private[cmdline] case class PipelineFour
@clp(description = "", group = classOf[TestGroup], hidden = true)
private[cmdline] case class PipelineWithMutex
(@arg(mutex = Array("another")) var argument: String, @arg(mutex = Array("argument")) var another: String) extends CommandLineTaskTesting // argument should be required

@clp(description = "", group = classOf[TestGroup], hidden = true)
private[cmdline] case class PipelineBuildFailure
(@arg var argument: String = "default", @arg var flag: Boolean = false) extends CommandLineTaskTesting {
override def build(): Unit = throw new IllegalStateException()
}
31 changes: 30 additions & 1 deletion core/src/test/scala/dagr/core/execsystem/TaskManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,39 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B
tryTaskNTimes(taskManager = taskManager, task = task, numTimes = 1, taskIsDoneFinally = true, failedAreCompletedFinally = true)
}

it should "run an empty task" in {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be nice to include a test that has a small pipeline of realTask ==> Task.empty ==> anotherRealTask.

val task = Task.empty
val taskManager: TestTaskManager = getDefaultTaskManager()
taskManager.addTask(task)
val taskMap = taskManager.runToCompletion(true)
taskMap.size shouldBe 1
val taskInfo = taskMap.valueFor(task).get
TaskStatus.isTaskDone(taskInfo.status, failedIsDone = false) shouldBe true
}

it should "run an empty task as part of a pipeline" in {
val pipeline = new Pipeline() {
name = "Pipeline"
override def build(): Unit = {
def newTask(name: String) = new ShellCommand("exit", "0") withName "exit 0" requires ResourceSet.empty withName name
val middle = Task.empty
root ==> newTask("exit0-1") ==> middle ==> newTask("exit0-2")
}
}

val taskManager: TestTaskManager = getDefaultTaskManager()
taskManager.addTask(pipeline)
val taskMap = taskManager.runToCompletion(true)
taskMap.size shouldBe 4
taskMap.foreach { case (_, info) =>
TaskStatus.isTaskDone(info.status, failedIsDone = false) shouldBe true
}
}


it should "run a simple task that fails but we allow it" in {
val task: UnitTask = new ShellCommand("exit", "1") withName "exit 1"
val taskManager: TestTaskManager = getDefaultTaskManager()

tryTaskNTimes(taskManager = taskManager, task = task, numTimes = 1, taskIsDoneFinally = true, failedAreCompletedFinally = true)
}

Expand Down