Skip to content
This repository was archived by the owner on Jun 20, 2024. It is now read-only.

Various Task Manage Speedups #373

Merged
merged 10 commits into from
Mar 25, 2020
7 changes: 5 additions & 2 deletions core/src/main/scala/dagr/core/execsystem/GraphNode.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class GraphNode(var task: Task,
var state: GraphNodeState.Value = GraphNodeState.PREDECESSORS_AND_UNEXPANDED,
val enclosingNode: Option[GraphNode] = None) extends BaseGraphNode {

private val _predecessors = new ListBuffer[GraphNode]()
private val _predecessors = new scala.collection.mutable.LinkedHashSet[GraphNode]()

_predecessors ++= predecessorNodes

Expand Down Expand Up @@ -93,9 +93,12 @@ class GraphNode(var task: Task,
addPredecessors(predecessor.toSeq:_*)
}

/** Gets the number of predecessors */
def numPredecessors: Int = _predecessors.size

/** Get the predecessors
*
* @return the current set of predecessors, if any
*/
def predecessors: List[GraphNode] = _predecessors.toList
def predecessors: Iterable[GraphNode] = _predecessors
}
30 changes: 11 additions & 19 deletions core/src/main/scala/dagr/core/execsystem/NaiveScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,15 @@ class NaiveScheduler extends Scheduler {
remainingSystemMemory: Memory,
remainingJvmMemory: Memory): Option[(UnitTask, ResourceSet)] = {
val systemResourceSet: ResourceSet = ResourceSet(remainingSystemCores, remainingSystemMemory)
val jvmResourceSet: ResourceSet = ResourceSet(remainingSystemCores, remainingJvmMemory)
val jvmResourceSet: ResourceSet = ResourceSet(remainingSystemCores, remainingJvmMemory)
// Find the first task that can be executed
readyTasks
.view // lazy
.map { // pick resources
case task: ProcessTask => (task, task.pickResources(systemResourceSet))
case task: InJvmTask => (task, task.pickResources(jvmResourceSet))
}
.find { // find the first that returned a resource set
case (_, Some(resourceSet)) => true
case _ => false
}
.map { // get the resource set
case (task, Some(resourceSet)) => (task, resourceSet)
case _ => throw new IllegalStateException("BUG")
.view
.flatMap { // pick resources
case task: ProcessTask => task.pickResources(systemResourceSet).map { rs => (task, rs) }
case task: InJvmTask => task.pickResources(jvmResourceSet).map { rs => (task, rs) }
}
.headOption
}

/** Runs one round of scheduling, trying to schedule as many ready tasks as possible given the
Expand All @@ -67,30 +60,29 @@ class NaiveScheduler extends Scheduler {
private def scheduleOnce(readyTasks: Iterable[UnitTask],
remainingSystemCores: Cores,
remainingSystemMemory: Memory,
remainingJvmMemory: Memory): List[(UnitTask, ResourceSet)] = {
remainingJvmMemory: Memory): Seq[(UnitTask, ResourceSet)] = {
// no more tasks ready to be scheduled
if (readyTasks.isEmpty) Nil
if (readyTasks.isEmpty) Seq.empty
else {
logger.debug(s"the resources were [System cores=" + remainingSystemCores.value
+ " System memory=" + Resource.parseBytesToSize(remainingSystemMemory.value)
+ " JVM memory=" + Resource.parseBytesToSize(remainingJvmMemory.value) + " ]")

// try one round of scheduling, and recurse if a task could be scheduled
scheduleOneTask(readyTasks, remainingSystemCores, remainingSystemMemory, remainingJvmMemory) match {
case None =>
Nil
case None => Seq.empty
case Some((task: UnitTask, resourceSet: ResourceSet)) =>
logger.debug("task to schedule is [" + task.name + "]")
logger.debug(s"task [${task.name}] uses the following resources [" + resourceSet + "]")
List[(UnitTask, ResourceSet)]((task, resourceSet)) ++ (task match {
case processTask: ProcessTask =>
case _: ProcessTask =>
scheduleOnce(
readyTasks = readyTasks.filterNot(t => t == task),
remainingSystemCores = remainingSystemCores - resourceSet.cores,
remainingSystemMemory = remainingSystemMemory - resourceSet.memory,
remainingJvmMemory = remainingJvmMemory
)
case inJvmTask: InJvmTask =>
case _: InJvmTask =>
scheduleOnce(
readyTasks = readyTasks.filterNot(t => t == task),
remainingSystemCores = remainingSystemCores - resourceSet.cores,
Expand Down
32 changes: 27 additions & 5 deletions core/src/main/scala/dagr/core/execsystem/ResourceSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,35 @@ case class ResourceSet(cores: Cores = Cores.none, memory: Memory = Memory.none)

/**
* Constructs a subset of this resource set with a fixed amount of memory and a variable
* number of cores. Will greedily assign the highest number of cores possible.
* number of cores. Will greedily assign the highest number of cores possible. If the maximum cores is a whole
* number, then a whole number will be returned, unless the minimum cores (which can be a fractional number of cores)
* is the only valid value. If the maximum cores is not a whole number, then the maximum fractional amount will be
* returned.
*
* Example 1: minCores=1, maxCores=5, this.cores=4.5, then 4 cores will be returned.
* Example 2: minCores=1, maxCores=5.1, this.cores=4.5, then 4.5 cores will be returned.
* Example 3: minCores=1, maxCores=5, this.cores=1.5, then 1 core will be returned.
* Example 4: minCores=1.5, maxCores=5, this.cores=1.5, then 1.5 cores will be returned.
*/
def subset(minCores: Cores, maxCores: Cores, memory: Memory) : Option[ResourceSet] = {
val min = minCores.value
val max = maxCores.value
val cores = Range.BigDecimal.inclusive(max, min, -1).find(cores => subset(Cores(cores.doubleValue), memory).isDefined)
cores.map(c => ResourceSet(Cores(c.doubleValue), memory))
if (!subsettable(ResourceSet(minCores, memory))) None else {
val coresValue = {
// Try to return a whole number value if maxCores is a whole number. If no whole number exists that is greater
// than or equal to minCores, then just use minCores (which could be fractional). If maxCores is fractional,
// then return a fractional value.
if (maxCores.value.isValidInt) {
// Get the number of cores, but rounded down to get a whole number value
val minValue = Math.floor(Math.min(this.cores.value, maxCores.value))
// If the number rounded down is smaller than the min-cores, then just return the min-cores
if (minValue < minCores.value) minCores.value else minValue
} else { // any fractional number will do
Math.min(this.cores.value, maxCores.value)
}
}
val resourceSet = ResourceSet(Cores(coresValue), memory)
require(subsettable(resourceSet))
Some(resourceSet)
}
}

/**
Expand Down
Loading