Skip to content

Commit

Permalink
[WASI] improve single-threaded threadpool (dotnet#107395)
Browse files Browse the repository at this point in the history
* fix dotnet#104803

* PollWasiEventLoopUntilResolvedVoid

* more

* wip

* CPU-bound work to do

* fix exit

* Update src/mono/sample/wasi/http-p2/Program.cs

Co-authored-by: Larry Ewing <[email protected]>

* feedback

---------

Co-authored-by: Larry Ewing <[email protected]>
  • Loading branch information
2 people authored and sirntar committed Sep 30, 2024
1 parent d42e645 commit a20eea8
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public static int Main(string[] args)
return PollWasiEventLoopUntilResolved((Thread)null!, MainAsync(args));

[UnsafeAccessor(UnsafeAccessorKind.StaticMethod, Name = "PollWasiEventLoopUntilResolved")]
static extern int PollWasiEventLoopUntilResolved(Thread t, Task<int> mainTask);
static extern T PollWasiEventLoopUntilResolved<T>(Thread t, Task<T> mainTask);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,16 @@ internal static System.Threading.Tasks.Task RegisterWasiPollableHandle(int handl
return WasiEventLoop.RegisterWasiPollableHandle(handle, cancellationToken);
}

internal static int PollWasiEventLoopUntilResolved(Task<int> mainTask)
internal static T PollWasiEventLoopUntilResolved<T>(Task<T> mainTask)
{
while (!mainTask.IsCompleted)
{
WasiEventLoop.DispatchWasiEventLoop();
}
var exception = mainTask.Exception;
if (exception is not null)
{
throw exception;
}

return mainTask.Result;
return WasiEventLoop.PollWasiEventLoopUntilResolved<T>(mainTask);
}

#endif
internal static void PollWasiEventLoopUntilResolvedVoid(Task mainTask)
{
WasiEventLoop.PollWasiEventLoopUntilResolvedVoid(mainTask);
}
#endif // TARGET_WASI

// the closest analog to Sleep(0) on Unix is sched_yield
internal static void UninterruptibleSleep0() => Thread.Yield();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -906,10 +906,7 @@ internal static bool Dispatch()
// thread because it sees a Determining or Scheduled stage, and the current thread is the last thread processing
// work items, the current thread must either see the work item queued by the enqueuer, or it must see a stage of
// Scheduled, and try to dequeue again or request another thread.
#if !TARGET_WASI
// TODO https://github.com/dotnet/runtime/issues/104803
Debug.Assert(workQueue._separated.queueProcessingStage == QueueProcessingStage.Scheduled);
#endif
workQueue._separated.queueProcessingStage = QueueProcessingStage.Determining;
Interlocked.MemoryBarrier();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Threading.Tasks;
using WasiPollWorld.wit.imports.wasi.io.v0_2_1;
using Pollable = WasiPollWorld.wit.imports.wasi.io.v0_2_1.IPoll.Pollable;
using MonotonicClockInterop = WasiPollWorld.wit.imports.wasi.clocks.v0_2_1.MonotonicClockInterop;

namespace System.Threading
{
Expand All @@ -14,7 +15,9 @@ internal static class WasiEventLoop
// it will be leaked and stay in this list forever.
// it will also keep the Pollable handle alive and prevent it from being disposed
private static readonly List<PollableHolder> s_pollables = new();
private static bool s_tasksCanceled;
private static bool s_checkScheduled;
private static Pollable? s_resolvedPollable;
private static Task? s_mainTask;

internal static Task RegisterWasiPollableHandle(int handle, CancellationToken cancellationToken)
{
Expand All @@ -29,18 +32,70 @@ internal static Task RegisterWasiPollable(Pollable pollable, CancellationToken c
// this will register the pollable holder into s_pollables
var holder = new PollableHolder(pollable, cancellationToken);
s_pollables.Add(holder);

ScheduleCheck();

return holder.taskCompletionSource.Task;
}

// this is not thread safe
internal static void DispatchWasiEventLoop()

internal static T PollWasiEventLoopUntilResolved<T>(Task<T> mainTask)
{
try
{
s_mainTask = mainTask;
while (!mainTask.IsCompleted)
{
ThreadPoolWorkQueue.Dispatch();
}
}
finally
{
s_mainTask = null;
}
var exception = mainTask.Exception;
if (exception is not null)
{
throw exception;
}

return mainTask.Result;
}

internal static void PollWasiEventLoopUntilResolvedVoid(Task mainTask)
{
try
{
s_mainTask = mainTask;
while (!mainTask.IsCompleted)
{
ThreadPoolWorkQueue.Dispatch();
}
}
finally
{
s_mainTask = null;
}

var exception = mainTask.Exception;
if (exception is not null)
{
throw exception;
}
}

internal static void ScheduleCheck()
{
ThreadPoolWorkQueue.Dispatch();
if (s_tasksCanceled)
if (!s_checkScheduled && s_pollables.Count > 0)
{
s_tasksCanceled = false;
return;
s_checkScheduled = true;
ThreadPool.UnsafeQueueUserWorkItem(CheckPollables, null);
}
}

internal static void CheckPollables(object? _)
{
s_checkScheduled = false;

var holders = new List<PollableHolder>(s_pollables.Count);
var pending = new List<Pollable>(s_pollables.Count);
Expand All @@ -58,13 +113,28 @@ internal static void DispatchWasiEventLoop()

if (pending.Count > 0)
{
var resolvedPollableIndex = -1;
// if there is CPU-bound work to do, we should not block on PollInterop.Poll below
// so we will append pollable resolved in 0ms
// in effect, the PollInterop.Poll would not block us
if (ThreadPool.PendingWorkItemCount > 0 || (s_mainTask != null && s_mainTask.IsCompleted))
{
s_resolvedPollable ??= MonotonicClockInterop.SubscribeDuration(0);
resolvedPollableIndex = pending.Count;
pending.Add(s_resolvedPollable);
}

var readyIndexes = PollInterop.Poll(pending);
for (int i = 0; i < readyIndexes.Length; i++)
{
uint readyIndex = readyIndexes[i];
var holder = holders[(int)readyIndex];
holder.ResolveAndDispose();
if (resolvedPollableIndex != readyIndex)
{
var holder = holders[(int)readyIndex];
holder.ResolveAndDispose();
}
}

for (int i = 0; i < holders.Count; i++)
{
PollableHolder holder = holders[i];
Expand All @@ -73,6 +143,8 @@ internal static void DispatchWasiEventLoop()
s_pollables.Add(holder);
}
}

ScheduleCheck();
}
}

Expand Down Expand Up @@ -112,19 +184,14 @@ public void ResolveAndDispose()
}

// for GC of abandoned Tasks or for cancellation
private static void CancelAndDispose(object? s)
public static void CancelAndDispose(object? s)
{
PollableHolder self = (PollableHolder)s!;
if (self.isDisposed)
{
return;
}

// Tell event loop to exit early, giving the application a
// chance to quit if the task(s) it is interested in have
// completed.
s_tasksCanceled = true;

// it will be removed from s_pollables on the next run
self.isDisposed = true;
self.pollable.Dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public static partial class ThreadPool
{
// Indicates whether the thread pool should yield the thread from the dispatch loop to the runtime periodically so that
// the runtime may use the thread for processing other work
internal static bool YieldFromDispatchLoop => false;
internal static bool YieldFromDispatchLoop => true;

private const bool IsWorkerTrackingEnabledInConfig = false;

Expand Down
3 changes: 2 additions & 1 deletion src/mono/sample/wasi/http-p2/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public static class WasiMainWrapper
{
public static async Task<int> MainAsync(string[] args)
{
_ = Task.Delay(100_000_000); // create a task that will not complete before main
await Task.Delay(100);
GC.Collect(); // test that Pollable->Task is not collected until resolved

Expand Down Expand Up @@ -78,7 +79,7 @@ public static int Main(string[] args)
return PollWasiEventLoopUntilResolved((Thread)null!, MainAsync(args));

[UnsafeAccessor(UnsafeAccessorKind.StaticMethod, Name = "PollWasiEventLoopUntilResolved")]
static extern int PollWasiEventLoopUntilResolved(Thread t, Task<int> mainTask);
static extern T PollWasiEventLoopUntilResolved<T>(Thread t, Task<T> mainTask);
}

}
2 changes: 1 addition & 1 deletion src/mono/wasi/testassets/Http.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,6 @@ public static int Main(string[] args)
return PollWasiEventLoopUntilResolved((Thread)null!, MainAsync(args));

[UnsafeAccessor(UnsafeAccessorKind.StaticMethod, Name = "PollWasiEventLoopUntilResolved")]
static extern int PollWasiEventLoopUntilResolved(Thread t, Task<int> mainTask);
static extern T PollWasiEventLoopUntilResolved<T>(Thread t, Task<T> mainTask);
}
}

0 comments on commit a20eea8

Please sign in to comment.