Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
A few NaiveScheduler simplifications
Browse files Browse the repository at this point in the history
  • Loading branch information
nh13 committed Mar 22, 2020
1 parent 056c442 commit df74f6b
Showing 1 changed file with 11 additions and 19 deletions.
30 changes: 11 additions & 19 deletions core/src/main/scala/dagr/core/execsystem/NaiveScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,15 @@ class NaiveScheduler extends Scheduler {
remainingSystemMemory: Memory,
remainingJvmMemory: Memory): Option[(UnitTask, ResourceSet)] = {
val systemResourceSet: ResourceSet = ResourceSet(remainingSystemCores, remainingSystemMemory)
val jvmResourceSet: ResourceSet = ResourceSet(remainingSystemCores, remainingJvmMemory)
val jvmResourceSet: ResourceSet = ResourceSet(remainingSystemCores, remainingJvmMemory)
// Find the first task that can be executed
readyTasks
.view // lazy
.map { // pick resources
case task: ProcessTask => (task, task.pickResources(systemResourceSet))
case task: InJvmTask => (task, task.pickResources(jvmResourceSet))
}
.find { // find the first that returned a resource set
case (_, Some(resourceSet)) => true
case _ => false
}
.map { // get the resource set
case (task, Some(resourceSet)) => (task, resourceSet)
case _ => throw new IllegalStateException("BUG")
.view
.flatMap { // pick resources
case task: ProcessTask => task.pickResources(systemResourceSet).map { rs => (task, rs) }
case task: InJvmTask => task.pickResources(jvmResourceSet).map { rs => (task, rs) }
}
.headOption
}

/** Runs one round of scheduling, trying to schedule as many ready tasks as possible given the
Expand All @@ -67,30 +60,29 @@ class NaiveScheduler extends Scheduler {
private def scheduleOnce(readyTasks: Iterable[UnitTask],
remainingSystemCores: Cores,
remainingSystemMemory: Memory,
remainingJvmMemory: Memory): List[(UnitTask, ResourceSet)] = {
remainingJvmMemory: Memory): Seq[(UnitTask, ResourceSet)] = {
// no more tasks ready to be scheduled
if (readyTasks.isEmpty) Nil
if (readyTasks.isEmpty) Seq.empty
else {
logger.debug(s"the resources were [System cores=" + remainingSystemCores.value
+ " System memory=" + Resource.parseBytesToSize(remainingSystemMemory.value)
+ " JVM memory=" + Resource.parseBytesToSize(remainingJvmMemory.value) + " ]")

// try one round of scheduling, and recurse if a task could be scheduled
scheduleOneTask(readyTasks, remainingSystemCores, remainingSystemMemory, remainingJvmMemory) match {
case None =>
Nil
case None => Seq.empty
case Some((task: UnitTask, resourceSet: ResourceSet)) =>
logger.debug("task to schedule is [" + task.name + "]")
logger.debug(s"task [${task.name}] uses the following resources [" + resourceSet + "]")
List[(UnitTask, ResourceSet)]((task, resourceSet)) ++ (task match {
case processTask: ProcessTask =>
case _: ProcessTask =>
scheduleOnce(
readyTasks = readyTasks.filterNot(t => t == task),
remainingSystemCores = remainingSystemCores - resourceSet.cores,
remainingSystemMemory = remainingSystemMemory - resourceSet.memory,
remainingJvmMemory = remainingJvmMemory
)
case inJvmTask: InJvmTask =>
case _: InJvmTask =>
scheduleOnce(
readyTasks = readyTasks.filterNot(t => t == task),
remainingSystemCores = remainingSystemCores - resourceSet.cores,
Expand Down

0 comments on commit df74f6b

Please sign in to comment.