Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Queue instead of ConcurrentQueue in single-threaded thread pool #111245

Merged
merged 12 commits into from
Jan 20, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ private void EnsureIOCompletionPollers()
}
}

private sealed unsafe class IOCompletionPoller
internal sealed unsafe class IOCompletionPoller
{
private const int NativeEventCapacity =
#if DEBUG
Expand All @@ -189,7 +189,7 @@ private sealed unsafe class IOCompletionPoller

private readonly nint _port;
private readonly Interop.Kernel32.OVERLAPPED_ENTRY* _nativeEvents;
private readonly ThreadPoolTypedWorkItemQueue<Event, Callback>? _events;
private readonly ThreadPoolTypedWorkItemQueue? _events;
private readonly Thread _thread;

public IOCompletionPoller(nint port)
Expand All @@ -202,7 +202,7 @@ public IOCompletionPoller(nint port)
_nativeEvents =
(Interop.Kernel32.OVERLAPPED_ENTRY*)
NativeMemory.Alloc(NativeEventCapacity, (nuint)sizeof(Interop.Kernel32.OVERLAPPED_ENTRY));
_events = new ThreadPoolTypedWorkItemQueue<Event, Callback>();
_events = new ThreadPoolTypedWorkItemQueue();

// These threads don't run user code, use a smaller stack size
_thread = new Thread(Poll, SmallStackSizeBytes);
Expand Down Expand Up @@ -299,29 +299,26 @@ private void PollAndInlineCallbacks()
}
}

private struct Callback : IThreadPoolTypedWorkItemQueueCallback<Event>
internal readonly partial struct Event
{
public static void Invoke(Event e)
public void Invoke()
{
if (NativeRuntimeEventSource.Log.IsEnabled())
{
NativeRuntimeEventSource.Log.ThreadPoolIODequeue(e.nativeOverlapped);
NativeRuntimeEventSource.Log.ThreadPoolIODequeue(nativeOverlapped);
}

// The NtStatus code for the operation is in the InternalLow field
uint ntStatus = (uint)(nint)e.nativeOverlapped->InternalLow;
uint ntStatus = (uint)(nint)nativeOverlapped->InternalLow;
uint errorCode = Interop.Errors.ERROR_SUCCESS;
if (!Interop.StatusOptions.NT_SUCCESS(ntStatus))
{
errorCode = Interop.NtDll.RtlNtStatusToDosError((int)ntStatus);
}

IOCompletionCallbackHelper.PerformSingleIOCompletionCallback(errorCode, e.bytesTransferred, e.nativeOverlapped);
IOCompletionCallbackHelper.PerformSingleIOCompletionCallback(errorCode, bytesTransferred, nativeOverlapped);
}
}

private readonly struct Event
{
public readonly NativeOverlapped* nativeOverlapped;
public readonly uint bytesTransferred;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@
using System.Runtime.Versioning;
using System.Threading.Tasks;

#if FEATURE_SINGLE_THREADED
using WorkQueue = System.Collections.Generic.Queue<object>;
#else
using WorkQueue = System.Collections.Concurrent.ConcurrentQueue<object>;
#endif
#if TARGET_WINDOWS
using IOCompletionPollerEvent = System.Threading.PortableThreadPool.IOCompletionPoller.Event;
#endif // TARGET_WINDOWS


namespace System.Threading
{
/// <summary>
Expand Down Expand Up @@ -403,19 +413,19 @@ public int Count
private bool _mayHaveHighPriorityWorkItems;

// SOS's ThreadPool command depends on the following names
internal readonly ConcurrentQueue<object> workItems = new ConcurrentQueue<object>();
internal readonly ConcurrentQueue<object> highPriorityWorkItems = new ConcurrentQueue<object>();
internal readonly WorkQueue workItems = new WorkQueue();
internal readonly WorkQueue highPriorityWorkItems = new WorkQueue();

#if CORECLR
internal readonly ConcurrentQueue<object> lowPriorityWorkItems =
s_prioritizationExperiment ? new ConcurrentQueue<object>() : null!;
internal readonly WorkQueue lowPriorityWorkItems =
s_prioritizationExperiment ? new WorkQueue() : null!;
#endif

// SOS's ThreadPool command depends on the following name. The global queue doesn't scale well beyond a point of
// concurrency. Some additional queues may be added and assigned to a limited number of worker threads if necessary to
// help with limiting the concurrency level.
internal readonly ConcurrentQueue<object>[] _assignableWorkItemQueues =
new ConcurrentQueue<object>[s_assignableWorkItemQueueCount];
internal readonly WorkQueue[] _assignableWorkItemQueues =
new WorkQueue[s_assignableWorkItemQueueCount];

private readonly LowLevelLock _queueAssignmentLock = new();
private readonly int[] _assignedWorkItemQueueThreadCounts =
Expand Down Expand Up @@ -455,7 +465,7 @@ public ThreadPoolWorkQueue()
{
for (int i = 0; i < s_assignableWorkItemQueueCount; i++)
{
_assignableWorkItemQueues[i] = new ConcurrentQueue<object>();
_assignableWorkItemQueues[i] = new WorkQueue();
}

RefreshLoggingEnabled();
Expand Down Expand Up @@ -560,7 +570,7 @@ private void UnassignWorkItemQueue(ThreadPoolWorkQueueThreadLocals tl)
// This queue is not assigned to any other worker threads. Move its work items to the global queue to prevent them
// from being starved for a long duration.
bool movedWorkItem = false;
ConcurrentQueue<object> queue = tl.assignedGlobalWorkItemQueue;
WorkQueue queue = tl.assignedGlobalWorkItemQueue;
while (_assignedWorkItemQueueThreadCounts[queueIndex] <= 0 && queue.TryDequeue(out object? workItem))
{
workItems.Enqueue(workItem);
Expand Down Expand Up @@ -640,7 +650,7 @@ public void Enqueue(object callback, bool forceGlobal)
}
else
{
ConcurrentQueue<object> queue =
WorkQueue queue =
s_assignableWorkItemQueueCount > 0 && (tl = ThreadPoolWorkQueueThreadLocals.threadLocals) != null
? tl.assignedGlobalWorkItemQueue
: workItems;
Expand All @@ -662,7 +672,7 @@ private void EnqueueForPrioritizationExperiment(object callback, bool forceGloba
return;
}

ConcurrentQueue<object> queue;
WorkQueue queue;

// This is a rough and experimental attempt at identifying work items that should be lower priority than other
// global work items (even ones that haven't been queued yet), and to queue them to a low-priority global queue that
Expand Down Expand Up @@ -892,7 +902,7 @@ public long GlobalCount
if (dispatchNormalPriorityWorkFirst && !tl.workStealingQueue.CanSteal)
{
workQueue._dispatchNormalPriorityWorkFirst = !dispatchNormalPriorityWorkFirst;
ConcurrentQueue<object> queue =
WorkQueue queue =
s_assignableWorkItemQueueCount > 0 ? tl.assignedGlobalWorkItemQueue : workQueue.workItems;
if (!queue.TryDequeue(out workItem) && s_assignableWorkItemQueueCount > 0)
{
Expand Down Expand Up @@ -1236,7 +1246,7 @@ internal sealed class ThreadPoolWorkQueueThreadLocals

public bool isProcessingHighPriorityWorkItems;
public int queueIndex;
public ConcurrentQueue<object> assignedGlobalWorkItemQueue;
public WorkQueue assignedGlobalWorkItemQueue;
public readonly ThreadPoolWorkQueue workQueue;
public readonly ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue;
public readonly Thread currentThread;
Expand Down Expand Up @@ -1272,17 +1282,9 @@ public void TransferLocalWork()
}
}

// A strongly typed callback for ThreadPoolTypedWorkItemQueue<T, TCallback>.
// This way we avoid the indirection of a delegate call.
internal interface IThreadPoolTypedWorkItemQueueCallback<T>
{
static abstract void Invoke(T item);
}
#if TARGET_WINDOWS

internal sealed class ThreadPoolTypedWorkItemQueue<T, TCallback> : IThreadPoolWorkItem
// https://github.com/dotnet/runtime/pull/69278#discussion_r871927939
where T : struct
where TCallback : struct, IThreadPoolTypedWorkItemQueueCallback<T>
internal sealed class ThreadPoolTypedWorkItemQueue : IThreadPoolWorkItem
{
// The scheme works as follows:
// - From NotScheduled, the only transition is to Scheduled when new items are enqueued and a TP work item is enqueued to process them.
Expand All @@ -1301,17 +1303,17 @@ private enum QueueProcessingStage
}

private QueueProcessingStage _queueProcessingStage;
private readonly ConcurrentQueue<T> _workItems = new ConcurrentQueue<T>();
private readonly ConcurrentQueue<IOCompletionPollerEvent> _workItems = new();

public int Count => _workItems.Count;

public void Enqueue(T workItem)
public void Enqueue(IOCompletionPollerEvent workItem)
{
BatchEnqueue(workItem);
CompleteBatchEnqueue();
}

public void BatchEnqueue(T workItem) => _workItems.Enqueue(workItem);
public void BatchEnqueue(IOCompletionPollerEvent workItem) => _workItems.Enqueue(workItem);
public void CompleteBatchEnqueue()
{
// Only enqueue a work item if the stage is NotScheduled.
Expand Down Expand Up @@ -1353,7 +1355,7 @@ private void UpdateQueueProcessingStage(bool isQueueEmpty)

void IThreadPoolWorkItem.Execute()
{
T workItem;
IOCompletionPollerEvent workItem;
while (true)
{
Debug.Assert(_queueProcessingStage == QueueProcessingStage.Scheduled);
Expand Down Expand Up @@ -1396,7 +1398,7 @@ void IThreadPoolWorkItem.Execute()
int startTimeMs = Environment.TickCount;
while (true)
{
TCallback.Invoke(workItem);
workItem.Invoke();

// This work item processes queued work items until certain conditions are met, and tracks some things:
// - Keep track of the number of work items processed, it will be added to the counter later
Expand Down Expand Up @@ -1429,6 +1431,8 @@ void IThreadPoolWorkItem.Execute()
}
}

#endif

public delegate void WaitCallback(object? state);

public delegate void WaitOrTimerCallback(object? state, bool timedOut); // signaled or timed out
Expand Down Expand Up @@ -1918,7 +1922,7 @@ internal static IEnumerable<object> GetQueuedWorkItems()
}

// Enumerate assignable global queues
foreach (ConcurrentQueue<object> queue in s_workQueue._assignableWorkItemQueues)
foreach (WorkQueue queue in s_workQueue._assignableWorkItemQueues)
{
foreach (object workItem in queue)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections.Concurrent;
using System.Diagnostics.CodeAnalysis;

namespace System.Text.Json.Serialization.Converters
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.Collections.Concurrent;
using System.Text.Json;
using System.Diagnostics.CodeAnalysis;

namespace SerializerTrimmingTest
{
Expand All @@ -11,6 +13,8 @@ namespace SerializerTrimmingTest
/// </summary>
internal class Program
{
// NOTE: ConcurrentQueue is only trimming safe because it's used by runtime thread pool. Except on single-threaded runtimes, where public parameterless constructor is trimmed.
[DynamicDependency(DynamicallyAccessedMemberTypes.PublicParameterlessConstructor, typeof(ConcurrentQueue<int>))]
static int Main(string[] args)
{
string json = "[1]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Concurrent;
using System.Text.Json;
using System.Diagnostics.CodeAnalysis;

/// <summary>
/// Tests that the System.Collections.NonGeneric dll is not dragged into a user's app
Expand All @@ -14,6 +15,8 @@
/// </summary>
class Program
{
// NOTE: ConcurrentQueue is only trimming safe because it's used by runtime thread pool. Except on single-threaded runtimes, where public parameterless constructor is trimmed.
[DynamicDependency(DynamicallyAccessedMemberTypes.PublicParameterlessConstructor, typeof(ConcurrentQueue<int>))]
static int Main(string[] args)
{
// Test serialization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@

<FeatureMono>true</FeatureMono>
<FeatureWasmManagedThreads Condition="('$(TargetsBrowser)' == 'true' or '$(TargetsWasi)' == 'true') and '$(WasmEnableThreads)' == 'true'">true</FeatureWasmManagedThreads>
<FeatureSingleThread Condition="('$(TargetsBrowser)' == 'true' or '$(TargetsWasi)' == 'true') and '$(WasmEnableThreads)' != 'true'">true</FeatureSingleThread>
pavelsavara marked this conversation as resolved.
Show resolved Hide resolved
<FeaturePortableTimer Condition="('$(TargetsBrowser)' != 'true' and '$(TargetsWasi)' != 'true') or '$(FeatureWasmManagedThreads)' == 'true'">true</FeaturePortableTimer>
<FeaturePortableThreadPool Condition="('$(TargetsBrowser)' != 'true' and '$(TargetsWasi)' != 'true') or '$(FeatureWasmManagedThreads)' == 'true'">true</FeaturePortableThreadPool>
<FeaturePerfTracing Condition="('$(TargetsBrowser)' != 'true' and '$(TargetsWasi)' != 'true')">true</FeaturePerfTracing>
Expand All @@ -121,6 +122,7 @@
<DefineConstants Condition="'$(FeaturePerfTracing)' == 'true'">$(DefineConstants);FEATURE_PERFTRACING</DefineConstants>
<DefineConstants Condition="'$(FeatureObjCMarshal)' == 'true'">$(DefineConstants);FEATURE_OBJCMARSHAL</DefineConstants>
<DefineConstants Condition="'$(FeatureWasmManagedThreads)' == 'true'">$(DefineConstants);FEATURE_WASM_MANAGED_THREADS</DefineConstants>
<DefineConstants Condition="'$(FeatureSingleThread)' == 'true'">$(DefineConstants);FEATURE_SINGLE_THREADED</DefineConstants>
</PropertyGroup>

<!-- ILLinker settings -->
Expand Down
Loading