From c21d90ec2ee5b3b36848861e1f663c27eff1a952 Mon Sep 17 00:00:00 2001 From: Pavel Savara Date: Tue, 10 Sep 2024 02:40:00 +0200 Subject: [PATCH] [WASI] improve single-threaded threadpool (#107395) * fix https://github.com/dotnet/runtime/issues/104803 * PollWasiEventLoopUntilResolvedVoid * more * wip * CPU-bound work to do * fix exit * Update src/mono/sample/wasi/http-p2/Program.cs Co-authored-by: Larry Ewing * feedback --------- Co-authored-by: Larry Ewing --- .../tests/WasmTestRunner/WasmTestRunner.cs | 2 +- .../src/System/Threading/Thread.Unix.cs | 20 ++-- .../System/Threading/ThreadPoolWorkQueue.cs | 3 - .../System/Threading/Wasi/WasiEventLoop.cs | 97 ++++++++++++++++--- .../System/Threading/ThreadPool.Wasi.Mono.cs | 2 +- src/mono/sample/wasi/http-p2/Program.cs | 3 +- src/mono/wasi/testassets/Http.cs | 2 +- 7 files changed, 94 insertions(+), 35 deletions(-) diff --git a/src/libraries/Common/tests/WasmTestRunner/WasmTestRunner.cs b/src/libraries/Common/tests/WasmTestRunner/WasmTestRunner.cs index 1cf1b35cf291a1..8c393b95f69c21 100644 --- a/src/libraries/Common/tests/WasmTestRunner/WasmTestRunner.cs +++ b/src/libraries/Common/tests/WasmTestRunner/WasmTestRunner.cs @@ -20,7 +20,7 @@ public static int Main(string[] args) return PollWasiEventLoopUntilResolved((Thread)null!, MainAsync(args)); [UnsafeAccessor(UnsafeAccessorKind.StaticMethod, Name = "PollWasiEventLoopUntilResolved")] - static extern int PollWasiEventLoopUntilResolved(Thread t, Task mainTask); + static extern T PollWasiEventLoopUntilResolved(Thread t, Task mainTask); } diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/Thread.Unix.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/Thread.Unix.cs index a3302b3bedc38c..592f0244e011a6 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/Thread.Unix.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/Thread.Unix.cs @@ -18,22 +18,16 @@ internal static System.Threading.Tasks.Task RegisterWasiPollableHandle(int handl return WasiEventLoop.RegisterWasiPollableHandle(handle, cancellationToken); } - internal static int PollWasiEventLoopUntilResolved(Task mainTask) + internal static T PollWasiEventLoopUntilResolved(Task mainTask) { - while (!mainTask.IsCompleted) - { - WasiEventLoop.DispatchWasiEventLoop(); - } - var exception = mainTask.Exception; - if (exception is not null) - { - throw exception; - } - - return mainTask.Result; + return WasiEventLoop.PollWasiEventLoopUntilResolved(mainTask); } -#endif + internal static void PollWasiEventLoopUntilResolvedVoid(Task mainTask) + { + WasiEventLoop.PollWasiEventLoopUntilResolvedVoid(mainTask); + } +#endif // TARGET_WASI // the closest analog to Sleep(0) on Unix is sched_yield internal static void UninterruptibleSleep0() => Thread.Yield(); diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs index 279932486e58a6..69a59198a40f4e 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs @@ -906,10 +906,7 @@ internal static bool Dispatch() // thread because it sees a Determining or Scheduled stage, and the current thread is the last thread processing // work items, the current thread must either see the work item queued by the enqueuer, or it must see a stage of // Scheduled, and try to dequeue again or request another thread. -#if !TARGET_WASI - // TODO https://github.com/dotnet/runtime/issues/104803 Debug.Assert(workQueue._separated.queueProcessingStage == QueueProcessingStage.Scheduled); -#endif workQueue._separated.queueProcessingStage = QueueProcessingStage.Determining; Interlocked.MemoryBarrier(); diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/Wasi/WasiEventLoop.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/Wasi/WasiEventLoop.cs index e8e7b20dc34736..f2a5d898566cd5 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/Wasi/WasiEventLoop.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/Wasi/WasiEventLoop.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using WasiPollWorld.wit.imports.wasi.io.v0_2_1; using Pollable = WasiPollWorld.wit.imports.wasi.io.v0_2_1.IPoll.Pollable; +using MonotonicClockInterop = WasiPollWorld.wit.imports.wasi.clocks.v0_2_1.MonotonicClockInterop; namespace System.Threading { @@ -14,7 +15,9 @@ internal static class WasiEventLoop // it will be leaked and stay in this list forever. // it will also keep the Pollable handle alive and prevent it from being disposed private static readonly List s_pollables = new(); - private static bool s_tasksCanceled; + private static bool s_checkScheduled; + private static Pollable? s_resolvedPollable; + private static Task? s_mainTask; internal static Task RegisterWasiPollableHandle(int handle, CancellationToken cancellationToken) { @@ -29,18 +32,70 @@ internal static Task RegisterWasiPollable(Pollable pollable, CancellationToken c // this will register the pollable holder into s_pollables var holder = new PollableHolder(pollable, cancellationToken); s_pollables.Add(holder); + + ScheduleCheck(); + return holder.taskCompletionSource.Task; } - // this is not thread safe - internal static void DispatchWasiEventLoop() + + internal static T PollWasiEventLoopUntilResolved(Task mainTask) + { + try + { + s_mainTask = mainTask; + while (!mainTask.IsCompleted) + { + ThreadPoolWorkQueue.Dispatch(); + } + } + finally + { + s_mainTask = null; + } + var exception = mainTask.Exception; + if (exception is not null) + { + throw exception; + } + + return mainTask.Result; + } + + internal static void PollWasiEventLoopUntilResolvedVoid(Task mainTask) + { + try + { + s_mainTask = mainTask; + while (!mainTask.IsCompleted) + { + ThreadPoolWorkQueue.Dispatch(); + } + } + finally + { + s_mainTask = null; + } + + var exception = mainTask.Exception; + if (exception is not null) + { + throw exception; + } + } + + internal static void ScheduleCheck() { - ThreadPoolWorkQueue.Dispatch(); - if (s_tasksCanceled) + if (!s_checkScheduled && s_pollables.Count > 0) { - s_tasksCanceled = false; - return; + s_checkScheduled = true; + ThreadPool.UnsafeQueueUserWorkItem(CheckPollables, null); } + } + + internal static void CheckPollables(object? _) + { + s_checkScheduled = false; var holders = new List(s_pollables.Count); var pending = new List(s_pollables.Count); @@ -58,13 +113,28 @@ internal static void DispatchWasiEventLoop() if (pending.Count > 0) { + var resolvedPollableIndex = -1; + // if there is CPU-bound work to do, we should not block on PollInterop.Poll below + // so we will append pollable resolved in 0ms + // in effect, the PollInterop.Poll would not block us + if (ThreadPool.PendingWorkItemCount > 0 || (s_mainTask != null && s_mainTask.IsCompleted)) + { + s_resolvedPollable ??= MonotonicClockInterop.SubscribeDuration(0); + resolvedPollableIndex = pending.Count; + pending.Add(s_resolvedPollable); + } + var readyIndexes = PollInterop.Poll(pending); for (int i = 0; i < readyIndexes.Length; i++) { uint readyIndex = readyIndexes[i]; - var holder = holders[(int)readyIndex]; - holder.ResolveAndDispose(); + if (resolvedPollableIndex != readyIndex) + { + var holder = holders[(int)readyIndex]; + holder.ResolveAndDispose(); + } } + for (int i = 0; i < holders.Count; i++) { PollableHolder holder = holders[i]; @@ -73,6 +143,8 @@ internal static void DispatchWasiEventLoop() s_pollables.Add(holder); } } + + ScheduleCheck(); } } @@ -112,7 +184,7 @@ public void ResolveAndDispose() } // for GC of abandoned Tasks or for cancellation - private static void CancelAndDispose(object? s) + public static void CancelAndDispose(object? s) { PollableHolder self = (PollableHolder)s!; if (self.isDisposed) @@ -120,11 +192,6 @@ private static void CancelAndDispose(object? s) return; } - // Tell event loop to exit early, giving the application a - // chance to quit if the task(s) it is interested in have - // completed. - s_tasksCanceled = true; - // it will be removed from s_pollables on the next run self.isDisposed = true; self.pollable.Dispose(); diff --git a/src/mono/System.Private.CoreLib/src/System/Threading/ThreadPool.Wasi.Mono.cs b/src/mono/System.Private.CoreLib/src/System/Threading/ThreadPool.Wasi.Mono.cs index bad9fbdbaaddf5..0d082fd863cb95 100644 --- a/src/mono/System.Private.CoreLib/src/System/Threading/ThreadPool.Wasi.Mono.cs +++ b/src/mono/System.Private.CoreLib/src/System/Threading/ThreadPool.Wasi.Mono.cs @@ -35,7 +35,7 @@ public static partial class ThreadPool { // Indicates whether the thread pool should yield the thread from the dispatch loop to the runtime periodically so that // the runtime may use the thread for processing other work - internal static bool YieldFromDispatchLoop => false; + internal static bool YieldFromDispatchLoop => true; private const bool IsWorkerTrackingEnabledInConfig = false; diff --git a/src/mono/sample/wasi/http-p2/Program.cs b/src/mono/sample/wasi/http-p2/Program.cs index e940c0a4100b6b..9bf6140161db5b 100644 --- a/src/mono/sample/wasi/http-p2/Program.cs +++ b/src/mono/sample/wasi/http-p2/Program.cs @@ -13,6 +13,7 @@ public static class WasiMainWrapper { public static async Task MainAsync(string[] args) { + _ = Task.Delay(100_000_000); // create a task that will not complete before main await Task.Delay(100); GC.Collect(); // test that Pollable->Task is not collected until resolved @@ -78,7 +79,7 @@ public static int Main(string[] args) return PollWasiEventLoopUntilResolved((Thread)null!, MainAsync(args)); [UnsafeAccessor(UnsafeAccessorKind.StaticMethod, Name = "PollWasiEventLoopUntilResolved")] - static extern int PollWasiEventLoopUntilResolved(Thread t, Task mainTask); + static extern T PollWasiEventLoopUntilResolved(Thread t, Task mainTask); } } diff --git a/src/mono/wasi/testassets/Http.cs b/src/mono/wasi/testassets/Http.cs index 49bc33f28fa3c5..87b1f8437f71f0 100644 --- a/src/mono/wasi/testassets/Http.cs +++ b/src/mono/wasi/testassets/Http.cs @@ -83,6 +83,6 @@ public static int Main(string[] args) return PollWasiEventLoopUntilResolved((Thread)null!, MainAsync(args)); [UnsafeAccessor(UnsafeAccessorKind.StaticMethod, Name = "PollWasiEventLoopUntilResolved")] - static extern int PollWasiEventLoopUntilResolved(Thread t, Task mainTask); + static extern T PollWasiEventLoopUntilResolved(Thread t, Task mainTask); } }