Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
lihaoyi committed Sep 7, 2024
1 parent b251aeb commit 229cc4d
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 140 deletions.
8 changes: 4 additions & 4 deletions example/depth/sandbox/1-task/build.mill
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// the filesystem operations of other tasks that may be occurring in parallel.
//
//
// == `T.dest`
// === `T.dest`
// The standard way of working with a task's `.dest/` folder is through the `T.dest`
// property. This is available within any task, and gives you access to the
// `out/<module-names>/<task-name>.dest/` folder to use. The `.dest/` folder for
Expand All @@ -24,7 +24,7 @@ object foo extends Module{
*/


// == Task `os.pwd` redirection
// === Task `os.pwd` redirection
// Mill also redirects the `os.pwd` property from https://github.com/com-lihaoyi/os-lib[OS-Lib],
// such that that also points towards a running task's own `.dest/` folder

Expand All @@ -48,7 +48,7 @@ def osProcTask = T {
.../out/osProcTask.dest
*/

// == Non-task `os.pwd` redirection
// === Non-task `os.pwd` redirection
//
// Lastly, there is the possibily of calling `os.pwd` outside of a task. When outside of
// a task there is no `.dest/` folder associated, so instead Mill will redirect `os.pwd`
Expand All @@ -63,7 +63,7 @@ def externalPwdTask = T { println(externalPwd.toString) }
*/


// == Limitations of Mill's Sandboxing
// === Limitations of Mill's Sandboxing
//
// Mill's approach to filesystem sandboxing is designed to avoid accidental interference
// between different Mill tasks. It is not designed to block intentional misbehavior, and
Expand Down
11 changes: 7 additions & 4 deletions example/depth/sandbox/2-test/build.mill
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,20 @@ object bar extends MyModule
//
// - `foo.test` runs in `out/foo/test/test.dest/`
// - `bar.test` runs in `out/bar/test/test.dest/`
//
// As a result, each test's `generated.html` file is written to its own dedicated
// working directory, without colliding with each other on disk:

/** Usage

> find . | grep generated.html
.../out/foo/test/test.dest/foo.FooTests/sandbox/generated.html
.../out/bar/test/test.dest/bar.BarTests/sandbox/generated.html
.../out/foo/test/test.dest/sandbox/generated.html
.../out/bar/test/test.dest/sandbox/generated.html

> cat out/foo/test/test.dest/foo.FooTests/sandbox/generated.html
> cat out/foo/test/test.dest/sandbox/generated.html
<h1>hello</h1>

> cat out/bar/test/test.dest/bar.BarTests/sandbox/generated.html
> cat out/bar/test/test.dest/sandbox/generated.html
<p>world</p>

*/
Expand Down
7 changes: 0 additions & 7 deletions main/api/src/mill/api/BlockableExecutionContext.scala

This file was deleted.

5 changes: 2 additions & 3 deletions main/api/src/mill/api/Ctx.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ class Ctx(
val reporter: Int => Option[CompileProblemReporter],
val testReporter: TestReporter,
val workspace: os.Path,
val systemExit: Int => Nothing,
val executionContext: BlockableExecutionContext
val systemExit: Int => Nothing
) extends Ctx.Dest
with Ctx.Log
with Ctx.Args
Expand All @@ -137,7 +136,7 @@ class Ctx(
testReporter: TestReporter,
workspace: os.Path
) = {
this(args, dest0, log, home, env, reporter, testReporter, workspace, i => ???, null)
this(args, dest0, log, home, env, reporter, testReporter, workspace, i => ???)
}
def dest: os.Path = dest0()
def arg[T](index: Int): T = {
Expand Down
7 changes: 3 additions & 4 deletions main/eval/src/mill/eval/EvaluatorCore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private[mill] trait EvaluatorCore extends GroupEvaluator {
logger: ColorLogger,
reporter: Int => Option[CompileProblemReporter] = _ => Option.empty[CompileProblemReporter],
testReporter: TestReporter = DummyTestReporter,
ec: BlockableExecutionContext,
ec: ExecutionContext with AutoCloseable,
contextLoggerMsg0: Int => String,
serialCommandExec: Boolean
): Evaluator.Results = {
Expand All @@ -91,7 +91,7 @@ private[mill] trait EvaluatorCore extends GroupEvaluator {
precomputeMethodNamesPerClass(sortedGroups)

def evaluateTerminals(terminals: Seq[Terminal], contextLoggerMsg: Int => String)(implicit
executionContext: BlockableExecutionContext
ec: ExecutionContext
) = {
// We walk the task graph in topological order and schedule the futures
// to run asynchronously. During this walk, we store the scheduled futures
Expand Down Expand Up @@ -126,8 +126,7 @@ private[mill] trait EvaluatorCore extends GroupEvaluator {
testReporter = testReporter,
logger = contextLogger,
classToTransitiveClasses,
allTransitiveClassMethods,
executionContext
allTransitiveClassMethods
)

if (failFast && res.newResults.values.exists(_.result.asSuccess.isEmpty))
Expand Down
30 changes: 5 additions & 25 deletions main/eval/src/mill/eval/ExecutionContexts.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
package mill.eval

import mill.api.BlockableExecutionContext

import java.util.concurrent.ForkJoinPool.ManagedBlocker
import java.util.concurrent.{
ExecutorService,
ForkJoinPool,
}
import scala.concurrent.ExecutionContext
import java.util.concurrent.ExecutorService

private object ExecutionContexts {

Expand All @@ -15,8 +10,7 @@ private object ExecutionContexts {
* spawning a separate thread or thread-pool. Used to turn parallel-async
* Future code into nice single-threaded code without needing to rewrite it
*/
object RunNow extends BlockableExecutionContext {
def blocking[T](t: => T): T = t
object RunNow extends ExecutionContext with AutoCloseable {
def execute(runnable: Runnable): Unit = runnable.run()
def reportFailure(cause: Throwable): Unit = {}
def close(): Unit = () // do nothing
Expand All @@ -26,22 +20,8 @@ private object ExecutionContexts {
* A simple thread-pool-based ExecutionContext with configurable thread count
* and AutoCloseable support
*/
class ThreadPool(threadCount: Int) extends BlockableExecutionContext {
val forkJoinPool: ForkJoinPool = new ForkJoinPool(threadCount)
val threadPool: ExecutorService = forkJoinPool

def blocking[T](t: => T): T = {
@volatile var res: Option[T] = None
ForkJoinPool.managedBlock(new ManagedBlocker {
def block(): Boolean = {
if (res.isEmpty) res = Some(t)
true
}
def isReleasable: Boolean = res.nonEmpty
})
res.get
}

class ThreadPool(threadCount: Int) extends ExecutionContext with AutoCloseable {
val threadPool: ExecutorService = java.util.concurrent.Executors.newFixedThreadPool(threadCount)
def execute(runnable: Runnable): Unit = threadPool.submit(runnable)
def reportFailure(t: Throwable): Unit = {}
def close(): Unit = threadPool.shutdown()
Expand Down
15 changes: 5 additions & 10 deletions main/eval/src/mill/eval/GroupEvaluator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ private[mill] trait GroupEvaluator {
testReporter: TestReporter,
logger: ColorLogger,
classToTransitiveClasses: Map[Class[_], IndexedSeq[Class[_]]],
allTransitiveClassMethods: Map[Class[_], Map[String, Method]],
executionContext: BlockableExecutionContext
allTransitiveClassMethods: Map[Class[_], Map[String, Method]]
): GroupEvaluator.Results = synchronizedEval(
terminal,
onCollision =
Expand Down Expand Up @@ -164,8 +163,7 @@ private[mill] trait GroupEvaluator {
counterMsg = counterMsg,
zincProblemReporter,
testReporter,
logger,
executionContext
logger
)
GroupEvaluator.Results(newResults, newEvaluated.toSeq, null, inputsHash, -1)

Expand Down Expand Up @@ -214,8 +212,7 @@ private[mill] trait GroupEvaluator {
counterMsg = counterMsg,
zincProblemReporter,
testReporter,
logger,
executionContext = executionContext
logger
)
}

Expand Down Expand Up @@ -255,8 +252,7 @@ private[mill] trait GroupEvaluator {
counterMsg: String,
reporter: Int => Option[CompileProblemReporter],
testReporter: TestReporter,
logger: mill.api.Logger,
executionContext: BlockableExecutionContext
logger: mill.api.Logger
): (Map[Task[_], TaskResult[(Val, Int)]], mutable.Buffer[Task[_]]) = {

def computeAll(enableTicker: Boolean) = {
Expand Down Expand Up @@ -319,8 +315,7 @@ private[mill] trait GroupEvaluator {
reporter = reporter,
testReporter = testReporter,
workspace = workspace,
systemExit = systemExit,
executionContext = executionContext
systemExit = systemExit
) with mill.api.Ctx.Jobs {
override def jobs: Int = effectiveThreadCount
}
Expand Down
130 changes: 47 additions & 83 deletions scalalib/src/mill/scalalib/TestModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import sbt.testing.Status
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit
import java.time.{Instant, LocalDateTime, ZoneId}
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.xml.Elem

trait TestModule
Expand Down Expand Up @@ -99,13 +97,6 @@ trait TestModule
testTask(testCachedArgs, T.task { Seq.empty[String] })()
}

/**
* Make this test module each individual test classes in parallel in separate JVMs
* and sandbox folders. Setting it to `false` will fall back to running all
* test classes in this module in a single JVM in a single sandbox folder
*/
def testParallelizeClasses: T[Boolean] = true

/**
* Discovers and runs the module's tests in a subprocess, reporting the
* results to the console.
Expand Down Expand Up @@ -188,9 +179,10 @@ trait TestModule
*/
protected def testTask(
args: Task[Seq[String]],
globSelectors: Task[Seq[String]]): Task[(String, Seq[TestResult])] =
globSelectors: Task[Seq[String]]
): Task[(String, Seq[TestResult])] =
T.task {

val outputPath = T.dest / "out.json"
val useArgsFile = testUseArgsFile()

val (jvmArgs, props: Map[String, String]) =
Expand All @@ -212,87 +204,59 @@ trait TestModule

val selectors = globSelectors()

val testArgs = TestArgs(
framework = testFramework(),
classpath = runClasspath().map(_.path),
arguments = args(),
sysProps = props,
outputPath = outputPath,
colored = T.log.colored,
testCp = testClasspath().map(_.path),
home = T.home,
globSelectors = selectors
)

val testRunnerClasspathArg = zincWorker().scalalibClasspath()
.map(_.path.toNIO.toUri.toURL)
.mkString(",")

val argsFile = T.dest / "testargs"
os.write(argsFile, upickle.default.write(testArgs))
val mainArgs = Seq(testRunnerClasspathArg, argsFile.toString)

os.makeDir(T.dest / "sandbox")
val resourceEnv =
Map(EnvVars.MILL_TEST_RESOURCE_FOLDER -> resources().map(_.path).mkString(";"))
Jvm.runSubprocess(
mainClass = "mill.testrunner.entrypoint.TestRunnerMain",
classPath =
(runClasspath() ++ zincWorker().testrunnerEntrypointClasspath()).map(
_.path
),
jvmArgs = jvmArgs,
envArgs = forkEnv() ++ resourceEnv,
mainArgs = mainArgs,
workingDir = if (testSandboxWorkingDir()) T.dest / "sandbox" else forkWorkingDir(),
useCpPassingJar = useArgsFile
)

def runTestSubprocess(selectors2: Seq[String], base: os.Path) = {
val outputPath = base / "out.json"
val testArgs = TestArgs(
framework = testFramework(),
classpath = runClasspath().map(_.path),
arguments = args(),
sysProps = props,
outputPath = outputPath,
colored = T.log.colored,
testCp = testClasspath().map(_.path),
home = T.home,
globSelectors = selectors2
)

val argsFile = base / "testargs"
val sandbox = base / "sandbox"
os.write(argsFile, upickle.default.write(testArgs), createFolders = true)

os.makeDir.all(sandbox)

Jvm.runSubprocess(
mainClass = "mill.testrunner.entrypoint.TestRunnerMain",
classPath =
(runClasspath() ++ zincWorker().testrunnerEntrypointClasspath()).map(
_.path
),
jvmArgs = jvmArgs,
envArgs = forkEnv() ++ resourceEnv,
mainArgs = Seq(testRunnerClasspathArg, argsFile.toString),
workingDir = if (testSandboxWorkingDir()) sandbox else forkWorkingDir(),
useCpPassingJar = useArgsFile
)

if (!os.exists(outputPath)) Left(s"Test reporting Failed: ${outputPath} does not exist")
else Right(upickle.default.read[(String, Seq[TestResult])](ujson.read(outputPath.toIO)))
}

val subprocessResult: Either[String, (String, Seq[TestResult])] =
if (!testParallelizeClasses()) runTestSubprocess(selectors, T.dest)
else {
implicit val ec = T.ctx.executionContext
val futures =
for (testClassName <- discoveredTestClasses()) yield Future {
(testClassName, runTestSubprocess(Seq(testClassName), T.dest / testClassName))
}

val outputs = T.ctx.executionContext.blocking {
Await.result(Future.sequence(futures), Duration.Inf)
}

val (lefts, rights) = outputs.partitionMap {
case (name, Left(v)) => Left(name + " " + v)
case (name, Right((msg, results))) => Right((name + " " + msg, results))
if (!os.exists(outputPath)) Result.Failure("Test execution failed.")
else
try {
val jsonOutput = ujson.read(outputPath.toIO)
val (doneMsg, results) = {
upickle.default.read[(String, Seq[TestResult])](jsonOutput)
}

if (lefts.nonEmpty) Left(lefts.mkString("\n"))
else Right((rights.map(_._1).mkString("\n"), rights.flatMap(_._2)))
if (results.isEmpty && selectors.nonEmpty) {
// no tests ran but we expected some to run, as we applied a filter (e.g. via `testOnly`)
Result.Failure(
s"Test selector does not match any test: ${selectors.mkString(" ")}" + "\nRun discoveredTestClasses to see available tests"
)
} else TestModule.handleResults(doneMsg, results, T.ctx(), testReportXml())
} catch {
case e: Throwable =>
Result.Failure("Test reporting failed: " + e)
}

subprocessResult match {
case Left(errMsg) => Result.Failure(errMsg)
case Right((doneMsg, results)) =>
try {
if (results.isEmpty && selectors.nonEmpty) {
// no tests ran but we expected some to run, as we applied a filter (e.g. via `testOnly`)
Result.Failure(
s"Test selector does not match any test: ${selectors.mkString(" ")}" + "\nRun discoveredTestClasses to see available tests"
)
} else TestModule.handleResults(doneMsg, results, T.ctx(), testReportXml())
} catch {
case e: Throwable =>
Result.Failure("Test reporting failed: " + e)
}
}
}

/**
Expand Down

0 comments on commit 229cc4d

Please sign in to comment.