From 26cbc8adee1bca52de9cb0c3fb8297c450c51d53 Mon Sep 17 00:00:00 2001 From: Li Haoyi Date: Tue, 1 Oct 2024 23:38:35 +0800 Subject: [PATCH] Concurrency fixes (#3642) * Swap from `ForkJoinPool` to `ThreadPoolExecutor` * `ForkJoinPool` does this weird thing where fork-join-tasks can be re-entrant at `join` points, resulting in weird scenarios where a mill-task that hits a yield point (e.g. inside Zinc / parallel-collections / FJP) can start running a second mill-task even before the first has finished, violating all sorts of invariants (# of running tasks exceeds `--jobs`, `FixSizeCache` semaphores get taken twice by the same thread, all sorts of craziness) * We replace `ForkJoinPool#ManagedBlocker` with our own manual logic increasing and decreasing the `ThreadPoolExecutor`s `maximumPoolSize` and `corePoolSize` in our `blocking{...}` wrapper * We need to `Thread#interrupt()` the `promptUpdaterThread` thread when we close the `PromptLogger`, so we don't need to wait the `promptUpdateInterval` (0.1ms for interactive, 60s for non-interactive) before exiting This should fix some of the flakiness we've been seeing in master, that seems to have started from https://github.com/com-lihaoyi/mill/commit/05bef7ebd6aaa03365ecb68e32dcad43ba58462e (just eyeballing the CI history), and blocking our re-bootstrapping in https://github.com/com-lihaoyi/mill/pull/3637 --- .../src/mill/eval/ExecutionContexts.scala | 39 ++++++++++++------- main/util/src/mill/util/PromptLogger.scala | 16 ++++---- 2 files changed, 34 insertions(+), 21 deletions(-) diff --git a/main/eval/src/mill/eval/ExecutionContexts.scala b/main/eval/src/mill/eval/ExecutionContexts.scala index 146795e9e27..d0de10c84ab 100644 --- a/main/eval/src/mill/eval/ExecutionContexts.scala +++ b/main/eval/src/mill/eval/ExecutionContexts.scala @@ -1,10 +1,10 @@ package mill.eval import os.Path + import scala.concurrent.{Await, Future} import scala.concurrent.duration.Duration -import java.util.concurrent.ForkJoinPool.ManagedBlocker -import java.util.concurrent.{ExecutorService, ForkJoinPool} +import java.util.concurrent.{ExecutorService, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit} private object ExecutionContexts { @@ -29,21 +29,32 @@ private object ExecutionContexts { * A simple thread-pool-based ExecutionContext with configurable thread count * and AutoCloseable support */ - class ThreadPool(threadCount: Int) extends mill.api.Ctx.Fork.Impl { + class ThreadPool(threadCount0: Int) extends mill.api.Ctx.Fork.Impl { def await[T](t: Future[T]): T = blocking { Await.result(t, Duration.Inf) } - val forkJoinPool: ForkJoinPool = new ForkJoinPool(threadCount) - val threadPool: ExecutorService = forkJoinPool + val executor: ThreadPoolExecutor = new ThreadPoolExecutor( + threadCount0, + threadCount0, + 0, + TimeUnit.SECONDS, + new LinkedBlockingQueue[Runnable]() + ) + + val threadPool: ExecutorService = executor + + def updateThreadCount(delta: Int): Unit = synchronized { + if (delta > 0) { + executor.setMaximumPoolSize(executor.getMaximumPoolSize + delta) + executor.setCorePoolSize(executor.getCorePoolSize + delta) + } else { + executor.setCorePoolSize(executor.getCorePoolSize + delta) + executor.setMaximumPoolSize(executor.getMaximumPoolSize + delta) + } + } def blocking[T](t: => T): T = { - @volatile var res: Option[T] = None - ForkJoinPool.managedBlock(new ManagedBlocker { - def block(): Boolean = { - if (res.isEmpty) res = Some(t) - true - } - def isReleasable: Boolean = res.nonEmpty - }) - res.get + updateThreadCount(1) + try t + finally updateThreadCount(-1) } def execute(runnable: Runnable): Unit = { diff --git a/main/util/src/mill/util/PromptLogger.scala b/main/util/src/mill/util/PromptLogger.scala index a68e2db921a..0591fb5cf4c 100644 --- a/main/util/src/mill/util/PromptLogger.scala +++ b/main/util/src/mill/util/PromptLogger.scala @@ -59,7 +59,8 @@ private[mill] class PromptLogger( if (termDimensions._1.isDefined) promptUpdateIntervalMillis else nonInteractivePromptUpdateIntervalMillis - Thread.sleep(promptUpdateInterval) + try Thread.sleep(promptUpdateInterval) + catch { case e: InterruptedException => /*do nothing*/ } if (!paused) { synchronized { @@ -85,8 +86,9 @@ private[mill] class PromptLogger( override def setPromptLeftHeader(s: String): Unit = synchronized { state.updateGlobal(s) } override def clearPromptStatuses(): Unit = synchronized { state.clearStatuses() } - override def removePromptLine(key: Seq[String]): Unit = - synchronized { state.updateCurrent(key, None) } + override def removePromptLine(key: Seq[String]): Unit = synchronized { + state.updateCurrent(key, None) + } def ticker(s: String): Unit = () override def setPromptDetail(key: Seq[String], s: String): Unit = { @@ -111,9 +113,8 @@ private[mill] class PromptLogger( synchronized { state.updateCurrent(key, Some(s"[${key.mkString("-")}] $message")) seenIdentifiers(key) = (verboseKeySuffix, message) - super.setPromptLine(key.map(infoColor(_).toString()), verboseKeySuffix, message) - } + def debug(s: String): Unit = synchronized { if (debugEnabled) systemStreams.err.println(s) } override def rawOutputStream: PrintStream = systemStreams0.out @@ -126,6 +127,7 @@ private[mill] class PromptLogger( } // Needs to be outside the lock so we don't deadlock with `promptUpdaterThread` // trying to take the lock one last time before exiting + promptUpdaterThread.interrupt() promptUpdaterThread.join() } @@ -140,7 +142,7 @@ private[mill] class PromptLogger( // After the prompt gets paused, wait until the `promptUpdaterThread` marks // `pauseNoticed = true`, so we can be sure it's done printing out prompt updates for // now and we can proceed with running `t` without any last updates slipping through - while (!pauseNoticed) Thread.sleep(1) + while (!pauseNoticed) Thread.sleep(2) // Clear the prompt so the code in `t` has a blank terminal to work with systemStreams0.err.write(AnsiNav.clearScreen(0).getBytes) systemStreams0.err.flush() @@ -174,7 +176,7 @@ private[mill] object PromptLogger { ) def awaitPumperEmpty(): Unit = { - while (pipe.input.available() != 0) Thread.sleep(10) + while (pipe.input.available() != 0) Thread.sleep(2) } object pumper extends ProxyStream.Pumper(pipe.input, systemStreams0.out, systemStreams0.err) { object PumperState extends Enumeration {