Skip to content

Commit

Permalink
Clean up mill.eval even more (#2542)
Browse files Browse the repository at this point in the history
1. `Terminal` goes in its own file
2. Split out `EvaluatorCore` (which contains only evaluation-related
stuff) from `EvaluatorImpl` (which contains some user-facing helpers)
3. Split out `GroupEvaluator` from `EvaluatorCore`
4. Split out `Plan`, `ThreadNumberer`, `JsonArrayLogger`
5. Unify implementation of `mill-profile.json` and
`mill-par-profile.json` (renamed `mill-chrome-profile.json`), so they
both perform streaming as-evaluation-progresses logging in a thread-safe
manner (should fix #2540)
6. Add `dependencies` to `mill-profile.json`, making it more useful
since you can now trace up and down the build graph without having to
run `mill inspect` (which would wipe out the `mill-profile.json` you
were just looking at!)
  • Loading branch information
lihaoyi authored May 24, 2023
1 parent 24de8f4 commit e153520
Show file tree
Hide file tree
Showing 12 changed files with 805 additions and 725 deletions.
2 changes: 1 addition & 1 deletion docs/modules/ROOT/pages/Intro_to_Mill.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ mill -j 4 __.compile
To use as many threads as your machine has (logical) processor cores use `--jobs 0`.
To disable parallel execution use `--jobs 1`. This is currently the default.

`mill -j` generates an output file in `out/mill-par-profile.json` that can be
`mill -j` generates an output file in `out/mill-chrome-profile.json` that can be
loaded into the Chrome browser's `chrome://tracing` page for visualization.
This can make it much easier to analyze your parallel runs to find out what's
taking the most time:
Expand Down
2 changes: 1 addition & 1 deletion docs/modules/ROOT/pages/Out_Dir.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ There are also top-level build-related files in the `out/` folder, prefixed as `
Probably the most useful file for you. It logs the tasks run and time taken for the last Mill command you executed.
This is very useful if Mill is being unexpectedly slow, and you want to find out exactly what tasks are being run.

`mill-par-profile.json`::
`mill-chrome-profile.json`::
This file is only written if you run Mill in parallel mode, e.g. `mill --jobs 4`. This file can be opened in Google Chrome with the built-in `tracing:` protocol even while Mill is still running, so you get a nice chart of what's going on in parallel.

`mill-worker-*/`::
Expand Down
25 changes: 4 additions & 21 deletions main/eval/src/mill/eval/Evaluator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import scala.collection.JavaConverters.mapAsScalaMapConverter
import scala.reflect.ClassTag
import scala.util.DynamicVariable

/**
* Public facing API of the Mill evaluation logic.
*/
trait Evaluator {
def baseLogger: ColorLogger
def rootModule: BaseModule
Expand All @@ -28,7 +31,7 @@ trait Evaluator {
def withBaseLogger(newBaseLogger: ColorLogger): Evaluator
def withFailFast(newFailFast: Boolean): Evaluator

def plan(goals: Agg[Task[_]]): (MultiBiMap[Evaluator.Terminal, Task[_]], Agg[Task[_]])
def plan(goals: Agg[Task[_]]): (MultiBiMap[Terminal, Task[_]], Agg[Task[_]])

/**
* Evaluate given task(s) and return the successful result(s), or throw an exception.
Expand All @@ -40,26 +43,6 @@ trait Evaluator {
}

object Evaluator {

/**
* A terminal or terminal target is some important work unit, that in most cases has a name (Right[Labelled])
* or was directly called by the user (Left[Task]).
* It's a T, T.worker, T.input, T.source, T.sources, T.persistent
*/
sealed trait Terminal {
def render: String
}

object Terminal {
case class Labelled[T](task: NamedTask[T], segments: Segments) extends Terminal {
def render = segments.render
}

case class Task[T](task: mill.define.Task[_]) extends Terminal {
def render = task.toString
}
}

trait Results {
def rawValues: Seq[Result[Val]]
def evaluated: Agg[Task[_]]
Expand Down
200 changes: 200 additions & 0 deletions main/eval/src/mill/eval/EvaluatorCore.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package mill.eval

import mill.api.Result.{Aborted, Failing}
import mill.api.Strict.Agg
import mill.api.{Ctx, _}
import mill.define._
import mill.eval.Evaluator.TaskResult
import mill.util._

import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import scala.collection.mutable
import scala.concurrent._

/**
* Core logic of evaluating tasks, without any user-facing helper methods
*/
private[mill] trait EvaluatorCore extends GroupEvaluator {

def baseLogger: ColorLogger

/**
* @param goals The tasks that need to be evaluated
* @param reporter A function that will accept a module id and provide a listener for build problems in that module
* @param testReporter Listener for test events like start, finish with success/error
*/
def evaluate(
goals: Agg[Task[_]],
reporter: Int => Option[CompileProblemReporter] = _ => Option.empty[CompileProblemReporter],
testReporter: TestReporter = DummyTestReporter,
logger: ColorLogger = baseLogger
): Evaluator.Results = {
os.makeDir.all(outPath)

PathRef.validatedPaths.withValue(new PathRef.ValidatedPaths()) {
val ec =
if (effectiveThreadCount == 1) ExecutionContexts.RunNow
else new ExecutionContexts.ThreadPool(effectiveThreadCount)

def contextLoggerMsg(threadId: Int) =
if (effectiveThreadCount == 1) ""
else s"[#${if (effectiveThreadCount > 9) f"$threadId%02d" else threadId}] "

try evaluate0(goals, logger, reporter, testReporter, ec, contextLoggerMsg)
finally ec.close()
}
}

private def getFailing(
sortedGroups: MultiBiMap[Terminal, Task[_]],
results: Map[Task[_], Evaluator.TaskResult[(Val, Int)]]
): MultiBiMap.Mutable[Terminal, Failing[Val]] = {
val failing = new MultiBiMap.Mutable[Terminal, Result.Failing[Val]]
for ((k, vs) <- sortedGroups.items()) {
val failures = vs.items.flatMap(results.get).collect {
case Evaluator.TaskResult(f: Result.Failing[(Val, Int)], _) => f.map(_._1)
}

failing.addAll(k, Loose.Agg.from(failures))
}
failing
}

private def evaluate0(
goals: Agg[Task[_]],
logger: ColorLogger,
reporter: Int => Option[CompileProblemReporter] = _ => Option.empty[CompileProblemReporter],
testReporter: TestReporter = DummyTestReporter,
ec: ExecutionContext with AutoCloseable,
contextLoggerMsg: Int => String
): Evaluator.Results = {
implicit val implicitEc = ec

os.makeDir.all(outPath)
val chromeProfileLogger = new ChromeProfileLogger(outPath / "mill-chrome-profile.json")
val profileLogger = new ProfileLogger(outPath / "mill-profile.json")
val threadNumberer = new ThreadNumberer()
val (sortedGroups, transitive) = Plan.plan(goals)
val interGroupDeps = findInterGroupDeps(sortedGroups)
val terminals = sortedGroups.keys().toVector
val failed = new AtomicBoolean(false)
val count = new AtomicInteger(1)

val futures = mutable.Map.empty[Terminal, Future[Option[GroupEvaluator.Results]]]

// We walk the task graph in topological order and schedule the futures
// to run asynchronously. During this walk, we store the scheduled futures
// in a dictionary. When scheduling each future, we are guaranteed that the
// necessary upstream futures will have already been scheduled and stored,
// due to the topological order of traversal.
for (terminal <- terminals) {
val deps = interGroupDeps(terminal)
futures(terminal) = Future.sequence(deps.map(futures)).map { upstreamValues =>
if (failed.get()) None
else {
val upstreamResults = upstreamValues
.iterator
.flatMap(_.iterator.flatMap(_.newResults))
.toMap

val startTime = System.currentTimeMillis()
val threadId = threadNumberer.getThreadId(Thread.currentThread())
val counterMsg = s"${count.getAndIncrement()}/${terminals.size}"
val contextLogger = PrefixLogger(
out = logger,
context = contextLoggerMsg(threadId),
tickerContext = GroupEvaluator.dynamicTickerPrefix.value
)

val res = evaluateGroupCached(
terminal = terminal,
group = sortedGroups.lookupKey(terminal),
results = upstreamResults,
counterMsg = counterMsg,
zincProblemReporter = reporter,
testReporter = testReporter,
logger = contextLogger
)

if (failFast && res.newResults.values.exists(_.result.asSuccess.isEmpty))
failed.set(true)

val endTime = System.currentTimeMillis()

chromeProfileLogger.log(
task = Terminal.printTerm(terminal),
cat = "job",
startTime = startTime,
endTime = endTime,
threadId = threadNumberer.getThreadId(Thread.currentThread()),
cached = res.cached
)

profileLogger.log(
ProfileLogger.Timing(
terminal.render,
(endTime - startTime).toInt,
res.cached,
deps.map(_.render)
)
)

Some(res)
}
}
}

val finishedOptsMap = terminals
.map(t => (t, Await.result(futures(t), duration.Duration.Inf)))
.toMap

val results0: Vector[(Task[_], TaskResult[(Val, Int)])] = terminals
.flatMap { t =>
sortedGroups.lookupKey(t).flatMap { t0 =>
finishedOptsMap(t) match {
case None => Some((t0, TaskResult(Aborted, () => Aborted)))
case Some(res) => res.newResults.get(t0).map(r => (t0, r))
}
}
}

val results: Map[Task[_], TaskResult[(Val, Int)]] = results0.toMap

chromeProfileLogger.close()
profileLogger.close()

EvaluatorCore.Results(
goals.indexed.map(results(_).map(_._1).result),
finishedOptsMap.map(_._2).flatMap(_.toSeq.flatMap(_.newEvaluated)),
transitive,
getFailing(sortedGroups, results),
results.map { case (k, v) => (k, v.map(_._1)) }
)
}

private def findInterGroupDeps(sortedGroups: MultiBiMap[Terminal, Task[_]])
: Map[Terminal, Seq[Terminal]] = {
sortedGroups
.items()
.map { case (terminal, group) =>
terminal -> Seq.from(group)
.flatMap(_.inputs)
.filterNot(group.contains)
.distinct
.map(sortedGroups.lookupValue)
.distinct
}
.toMap
}
}

private[mill] object EvaluatorCore {

case class Results(
rawValues: Seq[Result[Val]],
evaluated: Agg[Task[_]],
transitive: Agg[Task[_]],
failing: MultiBiMap[Terminal, Result.Failing[Val]],
results: Map[Task[_], TaskResult[Val]]
) extends Evaluator.Results
}
Loading

0 comments on commit e153520

Please sign in to comment.