From e153520eac4c0a16eec57e9c79b131b8bb41f6ff Mon Sep 17 00:00:00 2001 From: Li Haoyi Date: Tue, 23 May 2023 21:50:41 -0700 Subject: [PATCH] Clean up `mill.eval` even more (#2542) 1. `Terminal` goes in its own file 2. Split out `EvaluatorCore` (which contains only evaluation-related stuff) from `EvaluatorImpl` (which contains some user-facing helpers) 3. Split out `GroupEvaluator` from `EvaluatorCore` 4. Split out `Plan`, `ThreadNumberer`, `JsonArrayLogger` 5. Unify implementation of `mill-profile.json` and `mill-par-profile.json` (renamed `mill-chrome-profile.json`), so they both perform streaming as-evaluation-progresses logging in a thread-safe manner (should fix https://github.com/com-lihaoyi/mill/issues/2540) 6. Add `dependencies` to `mill-profile.json`, making it more useful since you can now trace up and down the build graph without having to run `mill inspect` (which would wipe out the `mill-profile.json` you were just looking at!) --- docs/modules/ROOT/pages/Intro_to_Mill.adoc | 2 +- docs/modules/ROOT/pages/Out_Dir.adoc | 2 +- main/eval/src/mill/eval/Evaluator.scala | 25 +- main/eval/src/mill/eval/EvaluatorCore.scala | 200 ++++++ main/eval/src/mill/eval/EvaluatorImpl.scala | 638 +----------------- main/eval/src/mill/eval/GroupEvaluator.scala | 397 +++++++++++ main/eval/src/mill/eval/JsonArrayLogger.scala | 94 +++ .../src/mill/eval/ParallelProfileLogger.scala | 71 -- main/eval/src/mill/eval/Plan.scala | 45 ++ main/eval/src/mill/eval/Terminal.scala | 40 ++ main/eval/src/mill/eval/ThreadNumberer.scala | 12 + main/src/mill/main/MainModule.scala | 4 +- 12 files changed, 805 insertions(+), 725 deletions(-) create mode 100644 main/eval/src/mill/eval/EvaluatorCore.scala create mode 100644 main/eval/src/mill/eval/GroupEvaluator.scala create mode 100644 main/eval/src/mill/eval/JsonArrayLogger.scala delete mode 100644 main/eval/src/mill/eval/ParallelProfileLogger.scala create mode 100644 main/eval/src/mill/eval/Plan.scala create mode 100644 main/eval/src/mill/eval/Terminal.scala create mode 100644 main/eval/src/mill/eval/ThreadNumberer.scala diff --git a/docs/modules/ROOT/pages/Intro_to_Mill.adoc b/docs/modules/ROOT/pages/Intro_to_Mill.adoc index d2d85db1f5f..70dc547e00b 100644 --- a/docs/modules/ROOT/pages/Intro_to_Mill.adoc +++ b/docs/modules/ROOT/pages/Intro_to_Mill.adoc @@ -99,7 +99,7 @@ mill -j 4 __.compile To use as many threads as your machine has (logical) processor cores use `--jobs 0`. To disable parallel execution use `--jobs 1`. This is currently the default. -`mill -j` generates an output file in `out/mill-par-profile.json` that can be +`mill -j` generates an output file in `out/mill-chrome-profile.json` that can be loaded into the Chrome browser's `chrome://tracing` page for visualization. This can make it much easier to analyze your parallel runs to find out what's taking the most time: diff --git a/docs/modules/ROOT/pages/Out_Dir.adoc b/docs/modules/ROOT/pages/Out_Dir.adoc index 65bb8ef6731..2958c697837 100644 --- a/docs/modules/ROOT/pages/Out_Dir.adoc +++ b/docs/modules/ROOT/pages/Out_Dir.adoc @@ -106,7 +106,7 @@ There are also top-level build-related files in the `out/` folder, prefixed as ` Probably the most useful file for you. It logs the tasks run and time taken for the last Mill command you executed. This is very useful if Mill is being unexpectedly slow, and you want to find out exactly what tasks are being run. -`mill-par-profile.json`:: +`mill-chrome-profile.json`:: This file is only written if you run Mill in parallel mode, e.g. `mill --jobs 4`. This file can be opened in Google Chrome with the built-in `tracing:` protocol even while Mill is still running, so you get a nice chart of what's going on in parallel. `mill-worker-*/`:: diff --git a/main/eval/src/mill/eval/Evaluator.scala b/main/eval/src/mill/eval/Evaluator.scala index 89cc047e3d9..b11fddb1a76 100644 --- a/main/eval/src/mill/eval/Evaluator.scala +++ b/main/eval/src/mill/eval/Evaluator.scala @@ -10,6 +10,9 @@ import scala.collection.JavaConverters.mapAsScalaMapConverter import scala.reflect.ClassTag import scala.util.DynamicVariable +/** + * Public facing API of the Mill evaluation logic. + */ trait Evaluator { def baseLogger: ColorLogger def rootModule: BaseModule @@ -28,7 +31,7 @@ trait Evaluator { def withBaseLogger(newBaseLogger: ColorLogger): Evaluator def withFailFast(newFailFast: Boolean): Evaluator - def plan(goals: Agg[Task[_]]): (MultiBiMap[Evaluator.Terminal, Task[_]], Agg[Task[_]]) + def plan(goals: Agg[Task[_]]): (MultiBiMap[Terminal, Task[_]], Agg[Task[_]]) /** * Evaluate given task(s) and return the successful result(s), or throw an exception. @@ -40,26 +43,6 @@ trait Evaluator { } object Evaluator { - - /** - * A terminal or terminal target is some important work unit, that in most cases has a name (Right[Labelled]) - * or was directly called by the user (Left[Task]). - * It's a T, T.worker, T.input, T.source, T.sources, T.persistent - */ - sealed trait Terminal { - def render: String - } - - object Terminal { - case class Labelled[T](task: NamedTask[T], segments: Segments) extends Terminal { - def render = segments.render - } - - case class Task[T](task: mill.define.Task[_]) extends Terminal { - def render = task.toString - } - } - trait Results { def rawValues: Seq[Result[Val]] def evaluated: Agg[Task[_]] diff --git a/main/eval/src/mill/eval/EvaluatorCore.scala b/main/eval/src/mill/eval/EvaluatorCore.scala new file mode 100644 index 00000000000..3fc1b30a886 --- /dev/null +++ b/main/eval/src/mill/eval/EvaluatorCore.scala @@ -0,0 +1,200 @@ +package mill.eval + +import mill.api.Result.{Aborted, Failing} +import mill.api.Strict.Agg +import mill.api.{Ctx, _} +import mill.define._ +import mill.eval.Evaluator.TaskResult +import mill.util._ + +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} +import scala.collection.mutable +import scala.concurrent._ + +/** + * Core logic of evaluating tasks, without any user-facing helper methods + */ +private[mill] trait EvaluatorCore extends GroupEvaluator { + + def baseLogger: ColorLogger + + /** + * @param goals The tasks that need to be evaluated + * @param reporter A function that will accept a module id and provide a listener for build problems in that module + * @param testReporter Listener for test events like start, finish with success/error + */ + def evaluate( + goals: Agg[Task[_]], + reporter: Int => Option[CompileProblemReporter] = _ => Option.empty[CompileProblemReporter], + testReporter: TestReporter = DummyTestReporter, + logger: ColorLogger = baseLogger + ): Evaluator.Results = { + os.makeDir.all(outPath) + + PathRef.validatedPaths.withValue(new PathRef.ValidatedPaths()) { + val ec = + if (effectiveThreadCount == 1) ExecutionContexts.RunNow + else new ExecutionContexts.ThreadPool(effectiveThreadCount) + + def contextLoggerMsg(threadId: Int) = + if (effectiveThreadCount == 1) "" + else s"[#${if (effectiveThreadCount > 9) f"$threadId%02d" else threadId}] " + + try evaluate0(goals, logger, reporter, testReporter, ec, contextLoggerMsg) + finally ec.close() + } + } + + private def getFailing( + sortedGroups: MultiBiMap[Terminal, Task[_]], + results: Map[Task[_], Evaluator.TaskResult[(Val, Int)]] + ): MultiBiMap.Mutable[Terminal, Failing[Val]] = { + val failing = new MultiBiMap.Mutable[Terminal, Result.Failing[Val]] + for ((k, vs) <- sortedGroups.items()) { + val failures = vs.items.flatMap(results.get).collect { + case Evaluator.TaskResult(f: Result.Failing[(Val, Int)], _) => f.map(_._1) + } + + failing.addAll(k, Loose.Agg.from(failures)) + } + failing + } + + private def evaluate0( + goals: Agg[Task[_]], + logger: ColorLogger, + reporter: Int => Option[CompileProblemReporter] = _ => Option.empty[CompileProblemReporter], + testReporter: TestReporter = DummyTestReporter, + ec: ExecutionContext with AutoCloseable, + contextLoggerMsg: Int => String + ): Evaluator.Results = { + implicit val implicitEc = ec + + os.makeDir.all(outPath) + val chromeProfileLogger = new ChromeProfileLogger(outPath / "mill-chrome-profile.json") + val profileLogger = new ProfileLogger(outPath / "mill-profile.json") + val threadNumberer = new ThreadNumberer() + val (sortedGroups, transitive) = Plan.plan(goals) + val interGroupDeps = findInterGroupDeps(sortedGroups) + val terminals = sortedGroups.keys().toVector + val failed = new AtomicBoolean(false) + val count = new AtomicInteger(1) + + val futures = mutable.Map.empty[Terminal, Future[Option[GroupEvaluator.Results]]] + + // We walk the task graph in topological order and schedule the futures + // to run asynchronously. During this walk, we store the scheduled futures + // in a dictionary. When scheduling each future, we are guaranteed that the + // necessary upstream futures will have already been scheduled and stored, + // due to the topological order of traversal. + for (terminal <- terminals) { + val deps = interGroupDeps(terminal) + futures(terminal) = Future.sequence(deps.map(futures)).map { upstreamValues => + if (failed.get()) None + else { + val upstreamResults = upstreamValues + .iterator + .flatMap(_.iterator.flatMap(_.newResults)) + .toMap + + val startTime = System.currentTimeMillis() + val threadId = threadNumberer.getThreadId(Thread.currentThread()) + val counterMsg = s"${count.getAndIncrement()}/${terminals.size}" + val contextLogger = PrefixLogger( + out = logger, + context = contextLoggerMsg(threadId), + tickerContext = GroupEvaluator.dynamicTickerPrefix.value + ) + + val res = evaluateGroupCached( + terminal = terminal, + group = sortedGroups.lookupKey(terminal), + results = upstreamResults, + counterMsg = counterMsg, + zincProblemReporter = reporter, + testReporter = testReporter, + logger = contextLogger + ) + + if (failFast && res.newResults.values.exists(_.result.asSuccess.isEmpty)) + failed.set(true) + + val endTime = System.currentTimeMillis() + + chromeProfileLogger.log( + task = Terminal.printTerm(terminal), + cat = "job", + startTime = startTime, + endTime = endTime, + threadId = threadNumberer.getThreadId(Thread.currentThread()), + cached = res.cached + ) + + profileLogger.log( + ProfileLogger.Timing( + terminal.render, + (endTime - startTime).toInt, + res.cached, + deps.map(_.render) + ) + ) + + Some(res) + } + } + } + + val finishedOptsMap = terminals + .map(t => (t, Await.result(futures(t), duration.Duration.Inf))) + .toMap + + val results0: Vector[(Task[_], TaskResult[(Val, Int)])] = terminals + .flatMap { t => + sortedGroups.lookupKey(t).flatMap { t0 => + finishedOptsMap(t) match { + case None => Some((t0, TaskResult(Aborted, () => Aborted))) + case Some(res) => res.newResults.get(t0).map(r => (t0, r)) + } + } + } + + val results: Map[Task[_], TaskResult[(Val, Int)]] = results0.toMap + + chromeProfileLogger.close() + profileLogger.close() + + EvaluatorCore.Results( + goals.indexed.map(results(_).map(_._1).result), + finishedOptsMap.map(_._2).flatMap(_.toSeq.flatMap(_.newEvaluated)), + transitive, + getFailing(sortedGroups, results), + results.map { case (k, v) => (k, v.map(_._1)) } + ) + } + + private def findInterGroupDeps(sortedGroups: MultiBiMap[Terminal, Task[_]]) + : Map[Terminal, Seq[Terminal]] = { + sortedGroups + .items() + .map { case (terminal, group) => + terminal -> Seq.from(group) + .flatMap(_.inputs) + .filterNot(group.contains) + .distinct + .map(sortedGroups.lookupValue) + .distinct + } + .toMap + } +} + +private[mill] object EvaluatorCore { + + case class Results( + rawValues: Seq[Result[Val]], + evaluated: Agg[Task[_]], + transitive: Agg[Task[_]], + failing: MultiBiMap[Terminal, Result.Failing[Val]], + results: Map[Task[_], TaskResult[Val]] + ) extends Evaluator.Results +} diff --git a/main/eval/src/mill/eval/EvaluatorImpl.scala b/main/eval/src/mill/eval/EvaluatorImpl.scala index 27b562a045b..eeb971fffef 100644 --- a/main/eval/src/mill/eval/EvaluatorImpl.scala +++ b/main/eval/src/mill/eval/EvaluatorImpl.scala @@ -1,31 +1,18 @@ package mill.eval -import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} -import scala.util.DynamicVariable -import mill.api.{ - CompileProblemReporter, - Ctx, - DummyTestReporter, - Loose, - PathRef, - Result, - Strict, - TestReporter, - Val -} -import mill.api.Result.{Aborted, Failing, OuterStack, Success} +import mill.api.Val + import mill.api.Strict.Agg import mill.define._ -import mill.eval.Evaluator.{TaskResult, Terminal} import mill.util._ import scala.collection.mutable -import scala.concurrent._ import scala.reflect.ClassTag -import scala.util.control.NonFatal /** - * Evaluate tasks. + * Implementation of [[Evaluator]], which serves both as internal logic as well + * as an odd bag of user-facing helper methods. Internal-only logic is + * extracted into [[EvaluatorCore]] */ private[mill] case class EvaluatorImpl( home: os.Path, @@ -39,11 +26,10 @@ private[mill] case class EvaluatorImpl( failFast: Boolean = true, threadCount: Option[Int] = Some(1), scriptImportGraph: Map[os.Path, (Int, Seq[os.Path])] = Map.empty -) extends Evaluator { +) extends Evaluator with EvaluatorCore { import EvaluatorImpl._ - val effectiveThreadCount: Int = - this.threadCount.getOrElse(Runtime.getRuntime().availableProcessors()) + val pathsResolver: EvaluatorPathsResolver = EvaluatorPathsResolver.default(outPath) override def withBaseLogger(newBaseLogger: ColorLogger): Evaluator = this.copy(baseLogger = newBaseLogger) @@ -51,539 +37,8 @@ private[mill] case class EvaluatorImpl( override def withFailFast(newFailFast: Boolean): Evaluator = this.copy(failFast = newFailFast) - val pathsResolver: EvaluatorPathsResolver = EvaluatorPathsResolver.default(outPath) - - /** - * @param goals The tasks that need to be evaluated - * @param reporter A function that will accept a module id and provide a listener for build problems in that module - * @param testReporter Listener for test events like start, finish with success/error - */ - def evaluate( - goals: Agg[Task[_]], - reporter: Int => Option[CompileProblemReporter] = _ => Option.empty[CompileProblemReporter], - testReporter: TestReporter = DummyTestReporter, - logger: ColorLogger = baseLogger - ): Evaluator.Results = { - os.makeDir.all(outPath) - - PathRef.validatedPaths.withValue(new PathRef.ValidatedPaths()) { - val ec = - if (effectiveThreadCount == 1) ExecutionContexts.RunNow - else new ExecutionContexts.ThreadPool(effectiveThreadCount) - - def contextLoggerMsg(threadId: Int) = - if (effectiveThreadCount == 1) "" - else s"[#${if (effectiveThreadCount > 9) f"$threadId%02d" else threadId}] " - - try evaluate0(goals, logger, reporter, testReporter, ec, contextLoggerMsg) - finally ec.close() - } - } - - def getFailing( - sortedGroups: MultiBiMap[Terminal, Task[_]], - results: Map[Task[_], TaskResult[(Val, Int)]] - ): MultiBiMap.Mutable[Terminal, Failing[Val]] = { - val failing = new MultiBiMap.Mutable[Terminal, Result.Failing[Val]] - for ((k, vs) <- sortedGroups.items()) { - val failures = vs.items.flatMap(results.get).collect { - case TaskResult(f: Result.Failing[(Val, Int)], _) => f.map(_._1) - } - - failing.addAll(k, Loose.Agg.from(failures)) - } - failing - } - - def evaluate0( - goals: Agg[Task[_]], - logger: ColorLogger, - reporter: Int => Option[CompileProblemReporter] = _ => Option.empty[CompileProblemReporter], - testReporter: TestReporter = DummyTestReporter, - ec: ExecutionContext with AutoCloseable, - contextLoggerMsg: Int => String - ): Evaluator.Results = { - implicit val implicitEc = ec - - os.makeDir.all(outPath) - val timeLog = new ParallelProfileLogger(outPath, System.currentTimeMillis()) - val timings = mutable.ArrayBuffer.empty[(Terminal, Int, Boolean)] - val (sortedGroups, transitive) = EvaluatorImpl.plan(goals) - val interGroupDeps = findInterGroupDeps(sortedGroups) - val terminals = sortedGroups.keys().toVector - val failed = new AtomicBoolean(false) - val count = new AtomicInteger(1) - val futures = mutable.Map.empty[Terminal, Future[Option[TaskGroupResults]]] - - // We walk the task graph in topological order and schedule the futures - // to run asynchronously. During this walk, we store the scheduled futures - // in a dictionary. When scheduling each future, we are guaranteed that the - // necessary upstream futures will have already been scheduled and stored, - // due to the topological order of traversal. - for (terminal <- terminals) { - val deps = interGroupDeps(terminal) - futures(terminal) = Future.sequence(deps.map(futures)).map { upstreamValues => - if (failed.get()) None - else { - val upstreamResults = upstreamValues - .iterator - .flatMap(_.iterator.flatMap(_.newResults)) - .toMap - - val startTime = System.currentTimeMillis() - val threadId = timeLog.getThreadId(Thread.currentThread().getName()) - val counterMsg = s"${count.getAndIncrement()}/${terminals.size}" - val contextLogger = PrefixLogger( - out = logger, - context = contextLoggerMsg(threadId), - tickerContext = EvaluatorImpl.dynamicTickerPrefix.value - ) - - val res = evaluateGroupCached( - terminal = terminal, - group = sortedGroups.lookupKey(terminal), - results = upstreamResults, - counterMsg = counterMsg, - zincProblemReporter = reporter, - testReporter = testReporter, - logger = contextLogger - ) - - if (failFast && res.newResults.values.exists(_.result.asSuccess.isEmpty)) - failed.set(true) - - val endTime = System.currentTimeMillis() - timeLog.timeTrace( - task = printTerm(terminal), - cat = "job", - startTime = startTime, - endTime = endTime, - thread = Thread.currentThread().getName(), - cached = res.cached - ) - timings.append((terminal, (endTime - startTime).toInt, res.cached)) - Some(res) - } - } - } - - EvaluatorImpl.writeTimings(timings.toSeq, outPath) - - val finishedOptsMap = terminals - .map(t => (t, Await.result(futures(t), duration.Duration.Inf))) - .toMap - - val results0: Vector[(Task[_], TaskResult[(Val, Int)])] = terminals - .flatMap { t => - sortedGroups.lookupKey(t).flatMap { t0 => - finishedOptsMap(t) match { - case None => Some((t0, TaskResult(Aborted, () => Aborted))) - case Some(res) => res.newResults.get(t0).map(r => (t0, r)) - } - } - } - - val results: Map[Task[_], TaskResult[(Val, Int)]] = results0.toMap - - timeLog.close() - - EvaluatorImpl.Results( - goals.indexed.map(results(_).map(_._1).result), - finishedOptsMap.map(_._2).flatMap(_.toSeq.flatMap(_.newEvaluated)), - transitive, - getFailing(sortedGroups, results), - results.map { case (k, v) => (k, v.map(_._1)) } - ) - } - - // those result which are inputs but not contained in this terminal group - def evaluateGroupCached( - terminal: Terminal, - group: Agg[Task[_]], - results: Map[Task[_], TaskResult[(Val, Int)]], - counterMsg: String, - zincProblemReporter: Int => Option[CompileProblemReporter], - testReporter: TestReporter, - logger: ColorLogger - ): TaskGroupResults = { - - val externalInputsHash = scala.util.hashing.MurmurHash3.orderedHash( - group.items.flatMap(_.inputs).filter(!group.contains(_)) - .flatMap(results(_).result.asSuccess.map(_.value._2)) - ) - - val sideHashes = scala.util.hashing.MurmurHash3.orderedHash( - group.iterator.map(_.sideHash) - ) - - val scriptsHash = { - val possibleScripts = scriptImportGraph.keySet.map(_.toString) - val scripts = new Loose.Agg.Mutable[os.Path]() - group.iterator.flatMap(t => Iterator(t) ++ t.inputs).foreach { - // Filter out the `fileName` as a string before we call `os.Path` on it, because - // otherwise linux paths on earlier-compiled artifacts can cause this to crash - // when running on Windows with a different file path format - case namedTask: NamedTask[_] if possibleScripts.contains(namedTask.ctx.fileName) => - scripts.append(os.Path(namedTask.ctx.fileName)) - case _ => - } - - val transitiveScripts = Graph.transitiveNodes(scripts)(t => - scriptImportGraph.get(t).map(_._2).getOrElse(Nil) - ) - - transitiveScripts - .iterator - // Sometimes tasks are defined in external/upstreadm dependencies, - // (e.g. a lot of tasks come from JavaModule.scala) and won't be - // present in the scriptImportGraph - .map(p => scriptImportGraph.get(p).fold(0)(_._1)) - .sum - } - - val inputsHash = externalInputsHash + sideHashes + classLoaderSigHash + scriptsHash - - terminal match { - case Terminal.Task(task) => - val (newResults, newEvaluated) = evaluateGroup( - group, - results, - inputsHash, - paths = None, - maybeTargetLabel = None, - counterMsg = counterMsg, - zincProblemReporter, - testReporter, - logger - ) - TaskGroupResults(newResults, newEvaluated.toSeq, false) - - case labelled: Terminal.Labelled[_] => - val out = - if (!labelled.task.ctx.external) outPath - else externalOutPath - - val paths = EvaluatorPaths.resolveDestPaths( - out, - destSegments(labelled) - ) - - val cached = loadCachedJson(logger, inputsHash, labelled, paths) - - val upToDateWorker = loadUpToDateWorker(logger, inputsHash, labelled) - - upToDateWorker.map((_, inputsHash)) orElse cached match { - case Some((v, hashCode)) => - val res = Result.Success((v, hashCode)) - val newResults: Map[Task[_], TaskResult[(Val, Int)]] = - Map(labelled.task -> TaskResult(res, () => res)) - - TaskGroupResults(newResults, Nil, cached = true) - - case _ => - // uncached - if (labelled.task.flushDest) os.remove.all(paths.dest) - - val targetLabel = printTerm(terminal) - - val (newResults, newEvaluated) = - EvaluatorImpl.dynamicTickerPrefix.withValue(s"[$counterMsg] $targetLabel > ") { - evaluateGroup( - group, - results, - inputsHash, - paths = Some(paths), - maybeTargetLabel = Some(targetLabel), - counterMsg = counterMsg, - zincProblemReporter, - testReporter, - logger - ) - } - - newResults(labelled.task) match { - case TaskResult(Result.Failure(_, Some((v, _))), _) => - handleTaskResult(v, v.##, paths.meta, inputsHash, labelled) - - case TaskResult(Result.Success((v, _)), _) => - handleTaskResult(v, v.##, paths.meta, inputsHash, labelled) - - case _ => - // Wipe out any cached meta.json file that exists, so - // a following run won't look at the cached metadata file and - // assume it's associated with the possibly-borked state of the - // destPath after an evaluation failure. - os.remove.all(paths.meta) - } - - TaskGroupResults(newResults, newEvaluated.toSeq, cached = false) - } - } - } - - def loadUpToDateWorker( - logger: ColorLogger, - inputsHash: Int, - labelled: Terminal.Labelled[_] - ): Option[Val] = { - labelled.task.asWorker - .flatMap { w => workerCache.synchronized { workerCache.get(w.ctx.segments) } } - .flatMap { - case (`inputsHash`, upToDate) => Some(upToDate) // worker cached and up-to-date - case (_, Val(obsolete: AutoCloseable)) => - // worker cached but obsolete, needs to be closed - try { - logger.debug(s"Closing previous worker: ${labelled.segments.render}") - obsolete.close() - } catch { - case NonFatal(e) => - logger.error( - s"${labelled.segments.render}: Errors while closing obsolete worker: ${e.getMessage()}" - ) - } - // make sure, we can no longer re-use a closed worker - labelled.task.asWorker.foreach { w => - workerCache.synchronized { - workerCache.remove(w.ctx.segments) - } - } - None - - case _ => None // worker not cached or obsolete - } - } - - def loadCachedJson( - logger: ColorLogger, - inputsHash: Int, - labelled: Terminal.Labelled[_], - paths: EvaluatorPaths - ): Option[(Val, Int)] = { - for { - cached <- - try Some(upickle.default.read[Evaluator.Cached](paths.meta.toIO)) - catch { - case NonFatal(_) => None - } - if cached.inputsHash == inputsHash - reader <- labelled.task.readWriterOpt - parsed <- - try Some(upickle.default.read(cached.value)(reader)) - catch { - case e: PathRef.PathRefValidationException => - logger.debug( - s"${labelled.segments.render}: re-evaluating; ${e.getMessage}" - ) - None - case NonFatal(_) => None - } - } yield (Val(parsed), cached.valueHash) - } - - def destSegments(labelledTask: Terminal.Labelled[_]): Segments = { - labelledTask.task.ctx.foreign match { - case Some(foreignSegments) => foreignSegments ++ labelledTask.segments - case None => labelledTask.segments - } - } - - def handleTaskResult( - v: Val, - hashCode: Int, - metaPath: os.Path, - inputsHash: Int, - labelled: Terminal.Labelled[_] - ): Unit = { - labelled.task.asWorker match { - case Some(w) => - workerCache.synchronized { - workerCache.update(w.ctx.segments, (inputsHash, v)) - } - case None => - val terminalResult = labelled - .task - .writerOpt - .asInstanceOf[Option[upickle.default.Writer[Any]]] - .map { w => upickle.default.writeJs(v.value)(w) } - - for (json <- terminalResult) { - os.write.over( - metaPath, - upickle.default.stream( - Evaluator.Cached(json, hashCode, inputsHash), - indent = 4 - ), - createFolders = true - ) - } - } - } - - def evaluateGroup( - group: Agg[Task[_]], - results: Map[Task[_], TaskResult[(Val, Int)]], - inputsHash: Int, - paths: Option[EvaluatorPaths], - maybeTargetLabel: Option[String], - counterMsg: String, - reporter: Int => Option[CompileProblemReporter], - testReporter: TestReporter, - logger: mill.api.Logger - ): (Map[Task[_], TaskResult[(Val, Int)]], mutable.Buffer[Task[_]]) = { - - def computeAll(enableTicker: Boolean) = { - val newEvaluated = mutable.Buffer.empty[Task[_]] - val newResults = mutable.Map.empty[Task[_], Result[(Val, Int)]] - - val nonEvaluatedTargets = group.indexed.filterNot(results.contains) - - // should we log progress? - val logRun = maybeTargetLabel.isDefined && { - val inputResults = for { - target <- nonEvaluatedTargets - item <- target.inputs.filterNot(group.contains) - } yield results(item).map(_._1) - inputResults.forall(_.result.isInstanceOf[Result.Success[_]]) - } - - val tickerPrefix = maybeTargetLabel.map { targetLabel => - val prefix = s"[$counterMsg] $targetLabel " - if (logRun && enableTicker) logger.ticker(prefix) - prefix + "| " - } - - val multiLogger = new ProxyLogger(resolveLogger(paths.map(_.log), logger)) { - override def ticker(s: String): Unit = { - if (enableTicker) super.ticker(tickerPrefix.getOrElse("") + s) - else () // do nothing - } - } - // This is used to track the usage of `T.dest` in more than one Task - // But it's not really clear what issue we try to prevent here - // Vice versa, being able to use T.dest in multiple `T.task` - // is rather essential to split up larger tasks into small parts - // So I like to disable this detection for now - var usedDest = Option.empty[(Task[_], Array[StackTraceElement])] - for (task <- nonEvaluatedTargets) { - newEvaluated.append(task) - val targetInputValues = task.inputs - .map { x => newResults.getOrElse(x, results(x).result) } - .collect { case Result.Success((v, _)) => v } - - val res = { - if (targetInputValues.length != task.inputs.length) Result.Skipped - else { - val args = new Ctx( - args = targetInputValues.map(_.value).toIndexedSeq, - dest0 = () => - paths match { - case Some(dest) => - if (usedDest.isEmpty) os.makeDir.all(dest.dest) - usedDest = Some((task, new Exception().getStackTrace)) - dest.dest - - case None => throw new Exception("No `dest` folder available here") - }, - log = multiLogger, - home = home, - env = env, - reporter = reporter, - testReporter = testReporter, - workspace = rootModule.millSourcePath - ) with mill.api.Ctx.Jobs { - override def jobs: Int = effectiveThreadCount - } - - mill.api.SystemStreams.withStreams(multiLogger.systemStreams) { - try task.evaluate(args).map(Val(_)) - catch { - case NonFatal(e) => - Result.Exception( - e, - new OuterStack(new Exception().getStackTrace.toIndexedSeq) - ) - } - } - } - } - - newResults(task) = for (v <- res) yield { - ( - v, - if (task.isInstanceOf[Worker[_]]) inputsHash - else v.## - ) - } - } - multiLogger.close() - (newResults, newEvaluated) - } - - val (newResults, newEvaluated) = computeAll(enableTicker = true) - - if (!failFast) maybeTargetLabel.foreach { targetLabel => - val taskFailed = newResults.exists(task => !task._2.isInstanceOf[Success[_]]) - if (taskFailed) { - logger.error(s"[${counterMsg}] ${targetLabel} failed") - } - } - - ( - newResults - .map { case (k, v) => - val recalc = () => computeAll(enableTicker = false)._1.apply(k) - val taskResult = TaskResult(v, recalc) - (k, taskResult) - } - .toMap, - newEvaluated - ) - } - - def resolveLogger(logPath: Option[os.Path], logger: mill.api.Logger): mill.api.Logger = - logPath match { - case None => logger - case Some(path) => new MultiLogger( - logger.colored, - logger, - // we always enable debug here, to get some more context in log files - new FileLogger(logger.colored, path, debugEnabled = true), - logger.systemStreams.in, - debugEnabled = logger.debugEnabled - ) - } - - // TODO: we could track the deps of the dependency chain, to prioritize tasks with longer chain - // TODO: we could also track the number of other tasks that depends on a task to prioritize - def findInterGroupDeps(sortedGroups: MultiBiMap[Terminal, Task[_]]) - : Map[Terminal, Seq[Terminal]] = { - sortedGroups - .items() - .map { case (terminal, group) => - terminal -> Seq.from(group) - .flatMap(_.inputs) - .filterNot(group.contains) - .distinct - .map(sortedGroups.lookupValue) - .distinct - } - .toMap - } - - def printTerm(term: Terminal): String = term match { - case Terminal.Task(task) => task.toString() - case labelled: Terminal.Labelled[_] => - val Seq(first, rest @ _*) = destSegments(labelled).value - val msgParts = Seq(first.asInstanceOf[Segment.Label].value) ++ rest.map { - case Segment.Label(s) => "." + s - case Segment.Cross(s) => "[" + s.mkString(",") + "]" - } - msgParts.mkString - } - - override def plan(goals: Agg[Task[_]]) - : (MultiBiMap[Evaluator.Terminal, Task[_]], Agg[Task[_]]) = { - EvaluatorImpl.plan(goals) + override def plan(goals: Agg[Task[_]]): (MultiBiMap[Terminal, Task[_]], Agg[Task[_]]) = { + Plan.plan(goals) } override def evalOrThrow(exceptionFactory: Evaluator.Results => Throwable) @@ -611,79 +66,4 @@ private[mill] object EvaluatorImpl { case r => r.values.map(_.value).asInstanceOf[Seq[T]] } } - - case class Results( - rawValues: Seq[Result[Val]], - evaluated: Agg[Task[_]], - transitive: Agg[Task[_]], - failing: MultiBiMap[Terminal, Result.Failing[Val]], - results: Map[Task[_], TaskResult[Val]] - ) extends Evaluator.Results - - case class Timing(label: String, millis: Int, cached: Boolean) - - object Timing { - implicit val readWrite: upickle.default.ReadWriter[Timing] = upickle.default.macroRW - } - - def writeTimings( - timings: Seq[(Terminal, Int, Boolean)], - outPath: os.Path - ): Unit = { - os.write.over( - outPath / "mill-profile.json", - upickle.default.stream( - timings.map { case (k, v, b) => - EvaluatorImpl.Timing(k.render, v, b) - }, - indent = 4 - ) - ) - } - - def plan(goals: Agg[Task[_]]): (MultiBiMap[Terminal, Task[_]], Strict.Agg[Task[_]]) = { - val transitive = Graph.transitiveTargets(goals) - val topoSorted = Graph.topoSorted(transitive) - val seen = collection.mutable.Set.empty[Segments] - val overridden = collection.mutable.Set.empty[Task[_]] - topoSorted.values.reverse.iterator.foreach { - case x: NamedTask[_] if x.isPrivate == Some(true) => - // we always need to store them in the super-path - overridden.add(x) - case x: NamedTask[_] => - if (!seen.contains(x.ctx.segments)) seen.add(x.ctx.segments) - else overridden.add(x) - case _ => // donothing - } - - val sortedGroups: MultiBiMap[Terminal, Task[_]] = - Graph.groupAroundImportantTargets(topoSorted) { - // important: all named tasks and those explicitly requested - case t: NamedTask[Any] => - val segments = t.ctx.segments - val augmentedSegments = - if (!overridden(t)) segments - else { - val Segment.Label(tName) = segments.value.last - Segments( - segments.value.init ++ - Seq(Segment.Label(tName + ".super")) ++ - t.ctx.enclosing.split("[.# ]").map(Segment.Label) - ) - } - Terminal.Labelled(t, augmentedSegments) - - case t if goals.contains(t) => Terminal.Task(t) - } - - (sortedGroups, transitive) - } - - case class TaskGroupResults( - newResults: Map[Task[_], TaskResult[(Val, Int)]], - newEvaluated: Seq[Task[_]], - cached: Boolean - ) - - val dynamicTickerPrefix = new DynamicVariable("") } diff --git a/main/eval/src/mill/eval/GroupEvaluator.scala b/main/eval/src/mill/eval/GroupEvaluator.scala new file mode 100644 index 00000000000..aee23bf9a0a --- /dev/null +++ b/main/eval/src/mill/eval/GroupEvaluator.scala @@ -0,0 +1,397 @@ +package mill.eval + +import mill.api.Result.{OuterStack, Success} +import mill.api.Strict.Agg +import mill.api._ +import mill.define._ +import mill.eval.Evaluator.TaskResult +import mill.util._ + +import scala.collection.mutable +import scala.util.DynamicVariable +import scala.util.control.NonFatal + +/** + * Logic around evaluating a single group, which is a collection of [[Task]]s + * with a single [[Terminal]]. + */ +private[mill] trait GroupEvaluator { + def home: os.Path + def outPath: os.Path + def externalOutPath: os.Path + def rootModule: mill.define.BaseModule + def classLoaderSigHash: Int + def workerCache: mutable.Map[Segments, (Int, Val)] + def env: Map[String, String] + def failFast: Boolean + def threadCount: Option[Int] + def scriptImportGraph: Map[os.Path, (Int, Seq[os.Path])] + + val effectiveThreadCount: Int = + this.threadCount.getOrElse(Runtime.getRuntime().availableProcessors()) + + // those result which are inputs but not contained in this terminal group + def evaluateGroupCached( + terminal: Terminal, + group: Agg[Task[_]], + results: Map[Task[_], TaskResult[(Val, Int)]], + counterMsg: String, + zincProblemReporter: Int => Option[CompileProblemReporter], + testReporter: TestReporter, + logger: ColorLogger + ): GroupEvaluator.Results = { + + val externalInputsHash = scala.util.hashing.MurmurHash3.orderedHash( + group.items.flatMap(_.inputs).filter(!group.contains(_)) + .flatMap(results(_).result.asSuccess.map(_.value._2)) + ) + + val sideHashes = scala.util.hashing.MurmurHash3.orderedHash( + group.iterator.map(_.sideHash) + ) + + val scriptsHash = { + val possibleScripts = scriptImportGraph.keySet.map(_.toString) + val scripts = new Loose.Agg.Mutable[os.Path]() + group.iterator.flatMap(t => Iterator(t) ++ t.inputs).foreach { + // Filter out the `fileName` as a string before we call `os.Path` on it, because + // otherwise linux paths on earlier-compiled artifacts can cause this to crash + // when running on Windows with a different file path format + case namedTask: NamedTask[_] if possibleScripts.contains(namedTask.ctx.fileName) => + scripts.append(os.Path(namedTask.ctx.fileName)) + case _ => + } + + val transitiveScripts = Graph.transitiveNodes(scripts)(t => + scriptImportGraph.get(t).map(_._2).getOrElse(Nil) + ) + + transitiveScripts + .iterator + // Sometimes tasks are defined in external/upstreadm dependencies, + // (e.g. a lot of tasks come from JavaModule.scala) and won't be + // present in the scriptImportGraph + .map(p => scriptImportGraph.get(p).fold(0)(_._1)) + .sum + } + + val inputsHash = externalInputsHash + sideHashes + classLoaderSigHash + scriptsHash + + terminal match { + case Terminal.Task(task) => + val (newResults, newEvaluated) = evaluateGroup( + group, + results, + inputsHash, + paths = None, + maybeTargetLabel = None, + counterMsg = counterMsg, + zincProblemReporter, + testReporter, + logger + ) + GroupEvaluator.Results(newResults, newEvaluated.toSeq, false) + + case labelled: Terminal.Labelled[_] => + val out = + if (!labelled.task.ctx.external) outPath + else externalOutPath + + val paths = EvaluatorPaths.resolveDestPaths( + out, + Terminal.destSegments(labelled) + ) + + val cached = loadCachedJson(logger, inputsHash, labelled, paths) + + val upToDateWorker = loadUpToDateWorker(logger, inputsHash, labelled) + + upToDateWorker.map((_, inputsHash)) orElse cached match { + case Some((v, hashCode)) => + val res = Result.Success((v, hashCode)) + val newResults: Map[Task[_], TaskResult[(Val, Int)]] = + Map(labelled.task -> TaskResult(res, () => res)) + + GroupEvaluator.Results(newResults, Nil, cached = true) + + case _ => + // uncached + if (labelled.task.flushDest) os.remove.all(paths.dest) + + val targetLabel = Terminal.printTerm(terminal) + + val (newResults, newEvaluated) = + GroupEvaluator.dynamicTickerPrefix.withValue(s"[$counterMsg] $targetLabel > ") { + evaluateGroup( + group, + results, + inputsHash, + paths = Some(paths), + maybeTargetLabel = Some(targetLabel), + counterMsg = counterMsg, + zincProblemReporter, + testReporter, + logger + ) + } + + newResults(labelled.task) match { + case TaskResult(Result.Failure(_, Some((v, _))), _) => + handleTaskResult(v, v.##, paths.meta, inputsHash, labelled) + + case TaskResult(Result.Success((v, _)), _) => + handleTaskResult(v, v.##, paths.meta, inputsHash, labelled) + + case _ => + // Wipe out any cached meta.json file that exists, so + // a following run won't look at the cached metadata file and + // assume it's associated with the possibly-borked state of the + // destPath after an evaluation failure. + os.remove.all(paths.meta) + } + + GroupEvaluator.Results(newResults, newEvaluated.toSeq, cached = false) + } + } + + } + + private def evaluateGroup( + group: Agg[Task[_]], + results: Map[Task[_], TaskResult[(Val, Int)]], + inputsHash: Int, + paths: Option[EvaluatorPaths], + maybeTargetLabel: Option[String], + counterMsg: String, + reporter: Int => Option[CompileProblemReporter], + testReporter: TestReporter, + logger: mill.api.Logger + ): (Map[Task[_], TaskResult[(Val, Int)]], mutable.Buffer[Task[_]]) = { + + def computeAll(enableTicker: Boolean) = { + val newEvaluated = mutable.Buffer.empty[Task[_]] + val newResults = mutable.Map.empty[Task[_], Result[(Val, Int)]] + + val nonEvaluatedTargets = group.indexed.filterNot(results.contains) + + // should we log progress? + val logRun = maybeTargetLabel.isDefined && { + val inputResults = for { + target <- nonEvaluatedTargets + item <- target.inputs.filterNot(group.contains) + } yield results(item).map(_._1) + inputResults.forall(_.result.isInstanceOf[Result.Success[_]]) + } + + val tickerPrefix = maybeTargetLabel.map { targetLabel => + val prefix = s"[$counterMsg] $targetLabel " + if (logRun && enableTicker) logger.ticker(prefix) + prefix + "| " + } + + val multiLogger = new ProxyLogger(resolveLogger(paths.map(_.log), logger)) { + override def ticker(s: String): Unit = { + if (enableTicker) super.ticker(tickerPrefix.getOrElse("") + s) + else () // do nothing + } + } + // This is used to track the usage of `T.dest` in more than one Task + // But it's not really clear what issue we try to prevent here + // Vice versa, being able to use T.dest in multiple `T.task` + // is rather essential to split up larger tasks into small parts + // So I like to disable this detection for now + var usedDest = Option.empty[(Task[_], Array[StackTraceElement])] + for (task <- nonEvaluatedTargets) { + newEvaluated.append(task) + val targetInputValues = task.inputs + .map { x => newResults.getOrElse(x, results(x).result) } + .collect { case Result.Success((v, _)) => v } + + val res = { + if (targetInputValues.length != task.inputs.length) Result.Skipped + else { + val args = new mill.api.Ctx( + args = targetInputValues.map(_.value).toIndexedSeq, + dest0 = () => + paths match { + case Some(dest) => + if (usedDest.isEmpty) os.makeDir.all(dest.dest) + usedDest = Some((task, new Exception().getStackTrace)) + dest.dest + + case None => throw new Exception("No `dest` folder available here") + }, + log = multiLogger, + home = home, + env = env, + reporter = reporter, + testReporter = testReporter, + workspace = rootModule.millSourcePath + ) with mill.api.Ctx.Jobs { + override def jobs: Int = effectiveThreadCount + } + + mill.api.SystemStreams.withStreams(multiLogger.systemStreams) { + try task.evaluate(args).map(Val(_)) + catch { + case NonFatal(e) => + Result.Exception( + e, + new OuterStack(new Exception().getStackTrace.toIndexedSeq) + ) + } + } + } + } + + newResults(task) = for (v <- res) yield { + ( + v, + if (task.isInstanceOf[Worker[_]]) inputsHash + else v.## + ) + } + } + multiLogger.close() + (newResults, newEvaluated) + } + + val (newResults, newEvaluated) = computeAll(enableTicker = true) + + if (!failFast) maybeTargetLabel.foreach { targetLabel => + val taskFailed = newResults.exists(task => !task._2.isInstanceOf[Success[_]]) + if (taskFailed) { + logger.error(s"[${counterMsg}] ${targetLabel} failed") + } + } + + ( + newResults + .map { case (k, v) => + val recalc = () => computeAll(enableTicker = false)._1.apply(k) + val taskResult = TaskResult(v, recalc) + (k, taskResult) + } + .toMap, + newEvaluated + ) + } + + private def handleTaskResult( + v: Val, + hashCode: Int, + metaPath: os.Path, + inputsHash: Int, + labelled: Terminal.Labelled[_] + ): Unit = { + labelled.task.asWorker match { + case Some(w) => + workerCache.synchronized { + workerCache.update(w.ctx.segments, (inputsHash, v)) + } + case None => + val terminalResult = labelled + .task + .writerOpt + .asInstanceOf[Option[upickle.default.Writer[Any]]] + .map { w => upickle.default.writeJs(v.value)(w) } + + for (json <- terminalResult) { + os.write.over( + metaPath, + upickle.default.stream( + Evaluator.Cached(json, hashCode, inputsHash), + indent = 4 + ), + createFolders = true + ) + } + } + } + + def resolveLogger(logPath: Option[os.Path], logger: mill.api.Logger): mill.api.Logger = + logPath match { + case None => logger + case Some(path) => new MultiLogger( + logger.colored, + logger, + // we always enable debug here, to get some more context in log files + new FileLogger(logger.colored, path, debugEnabled = true), + logger.systemStreams.in, + debugEnabled = logger.debugEnabled + ) + } + + private def loadCachedJson( + logger: ColorLogger, + inputsHash: Int, + labelled: Terminal.Labelled[_], + paths: EvaluatorPaths + ): Option[(Val, Int)] = { + for { + cached <- + try Some(upickle.default.read[Evaluator.Cached](paths.meta.toIO)) + catch { + case NonFatal(_) => None + } + if cached.inputsHash == inputsHash + reader <- labelled.task.readWriterOpt + parsed <- + try Some(upickle.default.read(cached.value)(reader)) + catch { + case e: PathRef.PathRefValidationException => + logger.debug( + s"${labelled.segments.render}: re-evaluating; ${e.getMessage}" + ) + None + case NonFatal(_) => None + } + } yield (Val(parsed), cached.valueHash) + } + + private def loadUpToDateWorker( + logger: ColorLogger, + inputsHash: Int, + labelled: Terminal.Labelled[_] + ): Option[Val] = { + labelled.task.asWorker + .flatMap { w => + workerCache.synchronized { + workerCache.get(w.ctx.segments) + } + } + .flatMap { + case (`inputsHash`, upToDate) => Some(upToDate) // worker cached and up-to-date + case (_, Val(obsolete: AutoCloseable)) => + // worker cached but obsolete, needs to be closed + try { + logger.debug(s"Closing previous worker: ${labelled.segments.render}") + obsolete.close() + } catch { + case NonFatal(e) => + logger.error( + s"${labelled.segments.render}: Errors while closing obsolete worker: ${e.getMessage()}" + ) + } + // make sure, we can no longer re-use a closed worker + labelled.task.asWorker.foreach { w => + workerCache.synchronized { + workerCache.remove(w.ctx.segments) + } + } + None + + case _ => None // worker not cached or obsolete + } + } +} + +private[mill] object GroupEvaluator { + val dynamicTickerPrefix = new DynamicVariable("") + + case class Results( + newResults: Map[Task[_], TaskResult[(Val, Int)]], + newEvaluated: Seq[Task[_]], + cached: Boolean + ) +} diff --git a/main/eval/src/mill/eval/JsonArrayLogger.scala b/main/eval/src/mill/eval/JsonArrayLogger.scala new file mode 100644 index 00000000000..2417eab53d1 --- /dev/null +++ b/main/eval/src/mill/eval/JsonArrayLogger.scala @@ -0,0 +1,94 @@ +package mill.eval + +import java.io.PrintStream +import java.nio.file.{Files, StandardOpenOption} + +private class JsonArrayLogger[T: upickle.default.Writer](outPath: os.Path, indent: Int) { + private var used = false + + private lazy val traceStream = { + val options = Seq( + Seq(StandardOpenOption.CREATE, StandardOpenOption.WRITE), + Seq(StandardOpenOption.TRUNCATE_EXISTING) + ).flatten + os.makeDir.all(outPath / os.up) + new PrintStream(Files.newOutputStream(outPath.toNIO, options: _*)) + } + + def log(t: T) = synchronized { + if (used) traceStream.println(",") + else traceStream.println("[") + used = true + val indented = upickle.default.write(t, indent = indent) + .linesIterator + .map(" " * indent + _) + .mkString("\n") + + traceStream.print(indented) + } + + def close(): Unit = synchronized { + traceStream.println() + traceStream.println("]") + traceStream.close() + } +} + +private class ProfileLogger(outPath: os.Path) + extends JsonArrayLogger[ProfileLogger.Timing](outPath, indent = 2) + +private object ProfileLogger { + case class Timing(label: String, millis: Int, cached: Boolean, dependencies: Seq[String] = Nil) + + object Timing { + implicit val readWrite: upickle.default.ReadWriter[Timing] = upickle.default.macroRW + } +} + +private class ChromeProfileLogger(outPath: os.Path) + extends JsonArrayLogger[ChromeProfileLogger.TraceEvent](outPath, indent = -1) { + + def log( + task: String, + cat: String, + startTime: Long, + endTime: Long, + threadId: Int, + cached: Boolean + ): Unit = { + + val event = ChromeProfileLogger.TraceEvent( + name = task, + cat = cat, + ph = "X", + ts = startTime * 1000, + dur = (endTime - startTime) * 1000 /*chrome treats the duration as microseconds*/, + pid = 1, + tid = threadId, + args = if (cached) Seq("cached") else Seq() + ) + log(event) + } +} + +private object ChromeProfileLogger { + + /** + * Trace Event Format, that can be loaded with Google Chrome via chrome://tracing + * See https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU/ + */ + case class TraceEvent( + name: String, + cat: String, + ph: String, + ts: Long, + dur: Long, + pid: Int, + tid: Int, + args: Seq[String] + ) + + object TraceEvent { + implicit val readWrite: upickle.default.ReadWriter[TraceEvent] = upickle.default.macroRW + } +} diff --git a/main/eval/src/mill/eval/ParallelProfileLogger.scala b/main/eval/src/mill/eval/ParallelProfileLogger.scala deleted file mode 100644 index 80b6d2d8770..00000000000 --- a/main/eval/src/mill/eval/ParallelProfileLogger.scala +++ /dev/null @@ -1,71 +0,0 @@ -package mill.eval - -import java.io.PrintStream -import java.nio.file.{Files, StandardOpenOption} - -private class ParallelProfileLogger(outPath: os.Path, startTime: Long) { - private var used = false - private val threadIds = collection.mutable.Map.empty[String, Int] - lazy val traceStream = { - val options = Seq( - Seq(StandardOpenOption.CREATE, StandardOpenOption.WRITE), - Seq(StandardOpenOption.TRUNCATE_EXISTING) - ).flatten - os.makeDir.all(outPath) - new PrintStream(Files.newOutputStream((outPath / "mill-par-profile.json").toNIO, options: _*)) - } - - def getThreadId(thread: String) = synchronized { - threadIds.getOrElseUpdate(thread, threadIds.size) - } - def timeTrace( - task: String, - cat: String, - startTime: Long, - endTime: Long, - thread: String, - cached: Boolean - ): Unit = synchronized { - traceStream.synchronized { - if (used) traceStream.println(",") - else traceStream.println("[") - used = true - traceStream.print( - upickle.default.write( - TraceEvent( - name = task, - cat = cat, - ph = "X", - ts = startTime * 1000, - dur = (endTime - startTime) * 1000 /*chrome treats the duration as microseconds*/, - pid = 1, - tid = getThreadId(thread), - args = if (cached) Seq("cached") else Seq() - ) - ) - ) - } - } - def close(): Unit = { - traceStream.println("]") - traceStream.close() - } -} - -/** - * Trace Event Format, that can be loaded with Google Chrome via chrome://tracing - * See https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU/ - */ -private case class TraceEvent( - name: String, - cat: String, - ph: String, - ts: Long, - dur: Long, - pid: Int, - tid: Int, - args: Seq[String] -) -private object TraceEvent { - implicit val readWrite: upickle.default.ReadWriter[TraceEvent] = upickle.default.macroRW -} diff --git a/main/eval/src/mill/eval/Plan.scala b/main/eval/src/mill/eval/Plan.scala new file mode 100644 index 00000000000..ff50394b5c0 --- /dev/null +++ b/main/eval/src/mill/eval/Plan.scala @@ -0,0 +1,45 @@ +package mill.eval +import mill.api.Loose.Agg +import mill.api.Strict +import mill.define.{NamedTask, Segment, Segments, Task} +import mill.util.MultiBiMap + +private object Plan { + def plan(goals: Agg[Task[_]]): (MultiBiMap[Terminal, Task[_]], Strict.Agg[Task[_]]) = { + val transitive = Graph.transitiveTargets(goals) + val topoSorted = Graph.topoSorted(transitive) + val seen = collection.mutable.Set.empty[Segments] + val overridden = collection.mutable.Set.empty[Task[_]] + topoSorted.values.reverse.iterator.foreach { + case x: NamedTask[_] if x.isPrivate == Some(true) => + // we always need to store them in the super-path + overridden.add(x) + case x: NamedTask[_] => + if (!seen.contains(x.ctx.segments)) seen.add(x.ctx.segments) + else overridden.add(x) + case _ => // donothing + } + + val sortedGroups: MultiBiMap[Terminal, Task[_]] = + Graph.groupAroundImportantTargets(topoSorted) { + // important: all named tasks and those explicitly requested + case t: NamedTask[Any] => + val segments = t.ctx.segments + val augmentedSegments = + if (!overridden(t)) segments + else { + val Segment.Label(tName) = segments.value.last + Segments( + segments.value.init ++ + Seq(Segment.Label(tName + ".super")) ++ + t.ctx.enclosing.split("[.# ]").map(Segment.Label) + ) + } + Terminal.Labelled(t, augmentedSegments) + + case t if goals.contains(t) => Terminal.Task(t) + } + + (sortedGroups, transitive) + } +} diff --git a/main/eval/src/mill/eval/Terminal.scala b/main/eval/src/mill/eval/Terminal.scala new file mode 100644 index 00000000000..a8b32658a29 --- /dev/null +++ b/main/eval/src/mill/eval/Terminal.scala @@ -0,0 +1,40 @@ +package mill.eval + +import mill.define.{NamedTask, Segment, Segments} + +/** + * A terminal or terminal target is some important work unit, that in most cases has a name (Right[Labelled]) + * or was directly called by the user (Left[Task]). + * It's a T, T.worker, T.input, T.source, T.sources, T.persistent + */ +sealed trait Terminal { + def render: String +} + +object Terminal { + case class Labelled[T](task: NamedTask[T], segments: Segments) extends Terminal { + def render = segments.render + } + + case class Task[T](task: mill.define.Task[_]) extends Terminal { + def render = task.toString + } + + def destSegments(labelledTask: Terminal.Labelled[_]): Segments = { + labelledTask.task.ctx.foreign match { + case Some(foreignSegments) => foreignSegments ++ labelledTask.segments + case None => labelledTask.segments + } + } + + def printTerm(term: Terminal): String = term match { + case Terminal.Task(task) => task.toString() + case labelled: Terminal.Labelled[_] => + val Seq(first, rest @ _*) = destSegments(labelled).value + val msgParts = Seq(first.asInstanceOf[Segment.Label].value) ++ rest.map { + case Segment.Label(s) => "." + s + case Segment.Cross(s) => "[" + s.mkString(",") + "]" + } + msgParts.mkString + } +} diff --git a/main/eval/src/mill/eval/ThreadNumberer.scala b/main/eval/src/mill/eval/ThreadNumberer.scala new file mode 100644 index 00000000000..793693bbc97 --- /dev/null +++ b/main/eval/src/mill/eval/ThreadNumberer.scala @@ -0,0 +1,12 @@ +package mill.eval + +/** + * Small class to take named threads and assign them stable integer IDs + */ +class ThreadNumberer() { + private val threadIds = collection.mutable.Map.empty[Thread, Int] + + def getThreadId(thread: Thread) = synchronized { + threadIds.getOrElseUpdate(thread, threadIds.size) + } +} diff --git a/main/src/mill/main/MainModule.scala b/main/src/mill/main/MainModule.scala index 7f293e75b2b..1b127d8b2e1 100644 --- a/main/src/mill/main/MainModule.scala +++ b/main/src/mill/main/MainModule.scala @@ -5,7 +5,7 @@ import mill.T import mill.main.BuildInfo import mill.api.{Ctx, Logger, PathRef, Result} import mill.define.{Command, NamedTask, Segments, Task} -import mill.eval.{Evaluator, EvaluatorPaths} +import mill.eval.{Evaluator, EvaluatorPaths, Terminal} import mill.resolve.{Resolve, SelectMode} import mill.resolve.SelectMode.Separated import mill.util.{PrintLogger, Watchable} @@ -147,7 +147,7 @@ trait MainModule extends mill.Module { case Left(err) => Left(err) case Right(rs) => val (sortedGroups, _) = evaluator.plan(rs) - Right(sortedGroups.keys().collect { case r: Evaluator.Terminal.Labelled[_] => r }.toArray) + Right(sortedGroups.keys().collect { case r: Terminal.Labelled[_] => r }.toArray) } }